Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: hook up process manager #6043

Merged
merged 8 commits into from
Oct 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/turborepo-lib/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -770,8 +770,8 @@ pub async fn run(

if args.experimental_rust_codepath {
use crate::commands::run;
run::run(base).await?;
Ok(Payload::Rust(Ok(0)))
let exit_code = run::run(base).await?;
Ok(Payload::Rust(Ok(exit_code)))
} else {
Ok(Payload::Go(Box::new(base)))
}
Expand Down
4 changes: 2 additions & 2 deletions crates/turborepo-lib/src/commands/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ use tracing::{debug, error};

use crate::{commands::CommandBase, run::Run};

pub async fn run(base: CommandBase) -> Result<()> {
pub async fn run(base: CommandBase) -> Result<i32> {
let mut run = Run::new(&base);
debug!("using the experimental rust codepath");
debug!("configured run struct: {:?}", run);

match run.run().await {
Ok(_code) => Ok(()),
Ok(code) => Ok(code),
Err(err) => {
error!("run failed: {}", err);
Err(err)
Expand Down
2 changes: 1 addition & 1 deletion crates/turborepo-lib/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
};

pub use builder::EngineBuilder;
pub use execute::{ExecuteError, ExecutionOptions, Message};
pub use execute::{ExecuteError, ExecutionOptions, Message, StopExecution};
use petgraph::Graph;

use crate::{
Expand Down
2 changes: 1 addition & 1 deletion crates/turborepo-lib/src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub struct RunOpts<'a> {
// Whether or not to infer the framework for each workspace.
pub(crate) framework_inference: bool,
pub profile: Option<&'a str>,
continue_on_error: bool,
pub(crate) continue_on_error: bool,
pub(crate) pass_through_args: &'a [String],
pub(crate) only: bool,
pub(crate) dry_run: bool,
Expand Down
19 changes: 19 additions & 0 deletions crates/turborepo-lib/src/process/child.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

use command_group::AsyncCommandGroup;
use futures::future::try_join3;
use itertools::Itertools;
pub use tokio::process::Command;
use tokio::{
io::{AsyncBufReadExt, AsyncRead, BufReader},
Expand Down Expand Up @@ -84,7 +85,7 @@
/// channel when the child process exits.
async fn process(&self, child: &mut tokio::process::Child) -> ChildState {
match self {
ShutdownStyle::Graceful(timeout) => {

Check warning on line 88 in crates/turborepo-lib/src/process/child.rs

View workflow job for this annotation

GitHub Actions / Build Turborepo (windows, windows-latest)

unused variable: `timeout`

Check warning on line 88 in crates/turborepo-lib/src/process/child.rs

View workflow job for this annotation

GitHub Actions / Turborepo E2E Tests (windows, windows-latest)

unused variable: `timeout`
// try ro run the command for the given timeout
#[cfg(unix)]
{
Expand Down Expand Up @@ -148,6 +149,7 @@
stdin: Arc<Mutex<Option<tokio::process::ChildStdin>>>,
stdout: Arc<Mutex<Option<tokio::process::ChildStdout>>>,
stderr: Arc<Mutex<Option<tokio::process::ChildStderr>>>,
label: String,
}

#[derive(Debug)]
Expand Down Expand Up @@ -177,6 +179,18 @@
/// Start a child process, returning a handle that can be used to interact
/// with it. The command will be started immediately.
pub fn spawn(mut command: Command, shutdown_style: ShutdownStyle) -> io::Result<Self> {
let label = {
let cmd = command.as_std();
format!(
"({}) {} {}",
cmd.get_current_dir()
.map(|dir| dir.to_string_lossy())
.unwrap_or_default(),
cmd.get_program().to_string_lossy(),
cmd.get_args().map(|s| s.to_string_lossy()).join(" ")
)
Comment on lines +184 to +191
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a port from Go so we can match outputs. Note that Go includes the program in Args while Rust does not so we must add it explicitly

};

let group = command.group().spawn()?;

let gid = group.id();
Expand Down Expand Up @@ -266,6 +280,7 @@
stdin: Arc::new(Mutex::new(stdin)),
stdout: Arc::new(Mutex::new(stdout)),
stderr: Arc::new(Mutex::new(stderr)),
label,
})
}

Expand Down Expand Up @@ -381,6 +396,10 @@

Ok(exit)
}

pub fn label(&self) -> &str {
&self.label
}
}

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion crates/turborepo-lib/src/process/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use futures::Future;
use tokio::task::JoinSet;
use tracing::{debug, trace};

use self::child::{Child, ChildExit};
pub use self::child::{Child, ChildExit};

/// A process manager that is responsible for spawning and managing child
/// processes. When the manager is Open, new child processes can be spawned
Expand Down
23 changes: 18 additions & 5 deletions crates/turborepo-lib/src/run/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod scope;
mod summary;
pub mod task_id;
use std::{
io::{BufWriter, IsTerminal},
io::{BufWriter, IsTerminal, Write},
sync::Arc,
};

Expand Down Expand Up @@ -62,7 +62,7 @@ impl<'a> Run<'a> {
self.base.args().try_into()
}

pub async fn run(&mut self) -> Result<()> {
pub async fn run(&mut self) -> Result<i32> {
let start_at = Local::now();
let package_json_path = self.base.repo_root.join_component("package.json");
let root_package_json =
Expand Down Expand Up @@ -206,7 +206,7 @@ impl<'a> Run<'a> {
engine.dot_graph(std::io::stdout(), opts.run_opts.single_package)?
}
}
return Ok(());
return Ok(0);
}

let root_workspace = pkg_dep_graph
Expand Down Expand Up @@ -279,9 +279,22 @@ impl<'a> Run<'a> {
global_env_mode,
self.base.ui,
false,
self.processes.clone(),
&self.base.repo_root,
);

visitor.visit(engine.clone()).await?;
let errors = visitor.visit(engine.clone()).await?;

let exit_code = errors
.iter()
.filter_map(|err| err.exit_code())
.max()
// We hit some error, it shouldn't be exit code 0
.unwrap_or(if errors.is_empty() { 0 } else { 1 });

for err in &errors {
writeln!(std::io::stderr(), "{err}").ok();
}

let pass_through_env = global_hash_inputs.pass_through_env.unwrap_or_default();
let resolved_pass_through_env_vars =
Expand Down Expand Up @@ -312,6 +325,6 @@ impl<'a> Run<'a> {

run_summary.close(0, &pkg_dep_graph, self.base.ui)?;

Ok(())
Ok(exit_code)
}
}
Loading
Loading