From 3d1e0d33abf1a2559aa6ecd0baec6dbc36caa6f6 Mon Sep 17 00:00:00 2001 From: Stu Hood Date: Sun, 12 Sep 2021 10:30:44 -0700 Subject: [PATCH] async-ify NailgunPool::connect and caller. [ci skip-build-wheels] --- .../process_execution/src/nailgun/mod.rs | 91 ++++---- .../src/nailgun/nailgun_pool.rs | 207 +++++++++--------- 2 files changed, 142 insertions(+), 156 deletions(-) diff --git a/src/rust/engine/process_execution/src/nailgun/mod.rs b/src/rust/engine/process_execution/src/nailgun/mod.rs index d07a403ec14..7533107fb04 100644 --- a/src/rust/engine/process_execution/src/nailgun/mod.rs +++ b/src/rust/engine/process_execution/src/nailgun/mod.rs @@ -26,7 +26,6 @@ mod parsed_jvm_command_lines; #[cfg(test)] mod parsed_jvm_command_lines_tests; -use async_semaphore::AsyncSemaphore; pub use nailgun_pool::NailgunPool; use parsed_jvm_command_lines::ParsedJVMCommandLines; use std::net::SocketAddr; @@ -93,7 +92,6 @@ fn construct_nailgun_client_request( pub struct CommandRunner { inner: super::local::CommandRunner, nailgun_pool: NailgunPool, - async_semaphore: async_semaphore::AsyncSemaphore, metadata: ProcessMetadata, workdir_base: PathBuf, executor: task_executor::Executor, @@ -109,7 +107,6 @@ impl CommandRunner { CommandRunner { inner: runner, nailgun_pool: NailgunPool::new(), - async_semaphore: AsyncSemaphore::new(1), metadata, workdir_base, executor, @@ -226,56 +223,50 @@ impl CapturedWorkdir for CommandRunner { &self.metadata, ); - let nailgun_pool = self.nailgun_pool.clone(); - let req2 = req.clone(); let workdir_for_this_nailgun = self.get_nailgun_workdir(&nailgun_name)?; - let build_id = context.build_id; - let store = self.inner.store.clone(); - let mut child = self - .async_semaphore - .clone() - .with_acquired(move |_id| { - // Get the port of a running nailgun server (or a new nailgun server if it doesn't exist) - nailgun_pool.connect( - nailgun_name.clone(), - nailgun_req, - workdir_for_this_nailgun, - nailgun_req_digest, - build_id, - store, - req.input_files, - ) - }) - .map_err(|e| format!("Failed to connect to nailgun! {}", e)) - .inspect(move |_| debug!("Connected to nailgun instance {}", &nailgun_name3)) - .and_then(move |nailgun_port| { - // Run the client request in the nailgun we have active. - debug!("Got nailgun port {} for {}", nailgun_port, nailgun_name2); - let client_req = construct_nailgun_client_request(req2, client_main_class, client_args); - let cmd = Command { - command: client_req.argv[0].clone(), - args: client_req.argv[1..].to_vec(), - env: client_req - .env - .iter() - .map(|(k, v)| (k.clone(), v.clone())) - .collect(), - working_dir: client_workdir, - }; - trace!("Client request: {:#?}", client_req); - let addr: SocketAddr = format!("127.0.0.1:{:?}", nailgun_port).parse().unwrap(); - debug!("Connecting to server at {}...", addr); - TcpStream::connect(addr) - .and_then(move |stream| { - nails::client::handle_connection(nails::Config::default(), stream, cmd, async { - let (_stdin_write, stdin_read) = child_channel::(); - stdin_read - }) + // Get the port of a running nailgun server (or a new nailgun server if it doesn't exist) + let nailgun_port = self + .nailgun_pool + .connect( + nailgun_name.clone(), + nailgun_req, + workdir_for_this_nailgun, + nailgun_req_digest, + context.build_id, + self.inner.store.clone(), + req.input_files, + ) + .await + .map_err(|e| format!("Failed to connect to nailgun! {}", e))?; + debug!("Connected to nailgun instance {}", &nailgun_name3); + let mut child = { + // Run the client request in the nailgun we have active. + debug!("Got nailgun port {} for {}", nailgun_port, nailgun_name2); + let client_req = construct_nailgun_client_request(req, client_main_class, client_args); + let cmd = Command { + command: client_req.argv[0].clone(), + args: client_req.argv[1..].to_vec(), + env: client_req + .env + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect(), + working_dir: client_workdir, + }; + trace!("Client request: {:#?}", client_req); + let addr: SocketAddr = format!("127.0.0.1:{:?}", nailgun_port).parse().unwrap(); + debug!("Connecting to server at {}...", addr); + TcpStream::connect(addr) + .and_then(move |stream| { + nails::client::handle_connection(nails::Config::default(), stream, cmd, async { + let (_stdin_write, stdin_read) = child_channel::(); + stdin_read }) - .map_err(|e| format!("Error communicating with server: {}", e)) - }) - .await?; + }) + .map_err(|e| format!("Error communicating with server: {}", e)) + .await? + }; let output_stream = child .output_stream diff --git a/src/rust/engine/process_execution/src/nailgun/nailgun_pool.rs b/src/rust/engine/process_execution/src/nailgun/nailgun_pool.rs index 15c12eb0dfa..a0922c0dad6 100644 --- a/src/rust/engine/process_execution/src/nailgun/nailgun_pool.rs +++ b/src/rust/engine/process_execution/src/nailgun/nailgun_pool.rs @@ -19,7 +19,6 @@ use parking_lot::Mutex; use regex::Regex; use sha2::{Digest as Sha256Digest, Sha256}; use store::Store; -use tryfuture::try_future; use crate::Process; @@ -92,7 +91,7 @@ impl NailgunPool { /// If the server is not running, or if it's running with a different configuration, /// this code will start a new server as a side effect. /// - pub fn connect( + pub async fn connect( &self, name: NailgunProcessName, startup_options: Process, @@ -101,123 +100,119 @@ impl NailgunPool { build_id_requesting_connection: String, store: Store, input_files: Digest, - ) -> BoxFuture<'static, Result> { + ) -> Result { let processes = self.processes.clone(); - let jdk_path = try_future!(startup_options.jdk_home.clone().ok_or_else(|| { + let jdk_path = startup_options.jdk_home.clone().ok_or_else(|| { format!( "jdk_home is not set for nailgun server startup request {:#?}", &startup_options ) - })); - let requested_server_fingerprint = try_future!(NailgunProcessFingerprint::new( - nailgun_req_digest, - jdk_path.clone() - )); + })?; + let requested_server_fingerprint = + NailgunProcessFingerprint::new(nailgun_req_digest, jdk_path.clone())?; - Self::materialize_workdir_for_server( - store, workdir_path.clone(), jdk_path, input_files - ).and_then(move |_| async move { - debug!("Locking nailgun process pool so that only one can be connecting at a time."); - let mut processes = processes.lock(); - debug!("Locked!"); - let connection_result = if let Some(process) = processes.get_mut(&name) { - // Clone some fields that we need for later - let (process_name, process_fingerprint, process_port, build_id_that_started_the_server) = ( - process.name.clone(), - process.fingerprint.clone(), - process.port, - process.build_id.clone(), - ); + Self::materialize_workdir_for_server(store, workdir_path.clone(), jdk_path, input_files) + .await?; - debug!( - "Checking if nailgun server {} is still alive at port {}...", - &process_name, process_port - ); + debug!("Locking nailgun process pool so that only one can be connecting at a time."); + let mut processes = processes.lock(); + debug!("Locked!"); + let connection_result = if let Some(process) = processes.get_mut(&name) { + // Clone some fields that we need for later + let (process_name, process_fingerprint, process_port, build_id_that_started_the_server) = ( + process.name.clone(), + process.fingerprint.clone(), + process.port, + process.build_id.clone(), + ); + + debug!( + "Checking if nailgun server {} is still alive at port {}...", + &process_name, process_port + ); - // If the process is in the map, check if it's alive using the handle. - let status = { - process - .handle - .lock() - .try_wait() - .map_err(|e| format!("Error getting the process status! {}", e)) - }; - match status { - Ok(None) => { - // Process hasn't exited yet - debug!( - "Found nailgun process {}, with fingerprint {:?}", - &name, process_fingerprint - ); - if requested_server_fingerprint == process_fingerprint { - debug!("The fingerprint of the running nailgun {:?} matches the requested fingerprint {:?}. Connecting to existing server.", - requested_server_fingerprint, process_fingerprint); - Ok(process_port) + // If the process is in the map, check if it's alive using the handle. + let status = { + process + .handle + .lock() + .try_wait() + .map_err(|e| format!("Error getting the process status! {}", e)) + }; + match status { + Ok(None) => { + // Process hasn't exited yet + debug!( + "Found nailgun process {}, with fingerprint {:?}", + &name, process_fingerprint + ); + if requested_server_fingerprint == process_fingerprint { + debug!("The fingerprint of the running nailgun {:?} matches the requested fingerprint {:?}. Connecting to existing server.", + requested_server_fingerprint, process_fingerprint); + Ok(process_port) + } else { + // The running process doesn't coincide with the options we want. + if build_id_that_started_the_server == build_id_requesting_connection { + Err(format!( + "Trying to change the JVM options for a running nailgun server that was started this run, with name {}.\ + There is exactly one nailgun server per task, so it shouldn't be possible to change the options of a nailgun server mid-run.\ + This might be a problem with how we calculate the keys of nailgun servers (https://github.com/pantsbuild/pants/issues/8527).", + &name) + ) } else { - // The running process doesn't coincide with the options we want. - if build_id_that_started_the_server == build_id_requesting_connection { - Err(format!( - "Trying to change the JVM options for a running nailgun server that was started this run, with name {}.\ - There is exactly one nailgun server per task, so it shouldn't be possible to change the options of a nailgun server mid-run.\ - This might be a problem with how we calculate the keys of nailgun servers (https://github.com/pantsbuild/pants/issues/8527).", - &name) - ) - } else { - // Restart it. - // Since the stored server was started in a different pants run, - // no client will be running on that server. - debug!( - "The options for server process {} are different to the startup_options, \ - and the original process was started in a different pants run.\n\ - Startup Options: {:?}\n Process Cmd: {:?}", - &process_name, startup_options, process_fingerprint - ); - debug!("Restarting the server..."); - Self::start_new_nailgun( - &mut *processes, - name, - startup_options, - &workdir_path, - requested_server_fingerprint, - build_id_requesting_connection, - ) - } + // Restart it. + // Since the stored server was started in a different pants run, + // no client will be running on that server. + debug!( + "The options for server process {} are different to the startup_options, \ + and the original process was started in a different pants run.\n\ + Startup Options: {:?}\n Process Cmd: {:?}", + &process_name, startup_options, process_fingerprint + ); + debug!("Restarting the server..."); + Self::start_new_nailgun( + &mut *processes, + name, + startup_options, + &workdir_path, + requested_server_fingerprint, + build_id_requesting_connection, + ) } } - Ok(_) => { - // The process has exited with some exit code - debug!("The requested nailgun server was not running anymore. Restarting process..."); - Self::start_new_nailgun( - &mut *processes, - name, - startup_options, - &workdir_path, - requested_server_fingerprint, - build_id_requesting_connection, - ) - } - Err(e) => Err(e), } - } else { - // We don't have a running nailgun registered in the map. - debug!( - "No nailgun server is running with name {}. Starting one...", - &name - ); - Self::start_new_nailgun( - &mut *processes, - name, - startup_options, - &workdir_path, - requested_server_fingerprint, - build_id_requesting_connection, - ) - }; - debug!("Unlocking nailgun process pool."); - connection_result - }) - .boxed() + Ok(_) => { + // The process has exited with some exit code + debug!("The requested nailgun server was not running anymore. Restarting process..."); + Self::start_new_nailgun( + &mut *processes, + name, + startup_options, + &workdir_path, + requested_server_fingerprint, + build_id_requesting_connection, + ) + } + Err(e) => Err(e), + } + } else { + // We don't have a running nailgun registered in the map. + debug!( + "No nailgun server is running with name {}. Starting one...", + &name + ); + Self::start_new_nailgun( + &mut *processes, + name, + startup_options, + &workdir_path, + requested_server_fingerprint, + build_id_requesting_connection, + ) + }; + debug!("Unlocking nailgun process pool."); + connection_result } //