Skip to content

Commit

Permalink
feat: hook up process manager (#6043)
Browse files Browse the repository at this point in the history
### Description

Hook up the process manager to the task graph visitor and start
executing tasks.

The PR includes some tangentially related changes that only impact our
terminal output during a run so it made sense to bundle in this PR.

### Testing Instructions

Run tasks using Rust! 
```
[0 olszewski@chriss-mbp] /tmp/pnpm-test $ turbo_dev lint --experimental-rust-codepath
web:lint: 
web:lint: > web@1.0.0 lint /private/tmp/pnpm-test/apps/web
web:lint: > next lint
web:lint: 
ui:lint: 
ui:lint: > ui@0.0.0 lint /private/tmp/pnpm-test/packages/ui
ui:lint: > eslint .
ui:lint: 
docs:lint: 
docs:lint: > docs@1.0.0 lint /private/tmp/pnpm-test/apps/docs
docs:lint: > next lint
docs:lint: 
docs:lint: ✔ No ESLint warnings or errors
web:lint: ✔ No ESLint warnings or errors
```

You can also attempt running error integration tests specifically the
one that is useful is `tests/run/one-script-error.t` (via
`EXPERIMENTAL_RUST_CODEPATH=true .cram_env/bin/prysk --shell=bash
tests/run/one-script-error.t`) as it tests execution cancelling due to
errors and `--continue` usage. Note that the test won't pass as the task
cache and run summary isn't hooked up, but modulo those the logs are
matching.


Closes TURBO-1369

---------

Co-authored-by: Chris Olszewski <Chris Olszewski>
  • Loading branch information
chris-olszewski committed Oct 2, 2023
1 parent dc1e2a7 commit b240354
Show file tree
Hide file tree
Showing 8 changed files with 232 additions and 21 deletions.
4 changes: 2 additions & 2 deletions crates/turborepo-lib/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -770,8 +770,8 @@ pub async fn run(

if args.experimental_rust_codepath {
use crate::commands::run;
run::run(base).await?;
Ok(Payload::Rust(Ok(0)))
let exit_code = run::run(base).await?;
Ok(Payload::Rust(Ok(exit_code)))
} else {
Ok(Payload::Go(Box::new(base)))
}
Expand Down
4 changes: 2 additions & 2 deletions crates/turborepo-lib/src/commands/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ use tracing::{debug, error};

use crate::{commands::CommandBase, run::Run};

pub async fn run(base: CommandBase) -> Result<()> {
pub async fn run(base: CommandBase) -> Result<i32> {
let mut run = Run::new(&base);
debug!("using the experimental rust codepath");
debug!("configured run struct: {:?}", run);

match run.run().await {
Ok(_code) => Ok(()),
Ok(code) => Ok(code),
Err(err) => {
error!("run failed: {}", err);
Err(err)
Expand Down
2 changes: 1 addition & 1 deletion crates/turborepo-lib/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
};

pub use builder::EngineBuilder;
pub use execute::{ExecuteError, ExecutionOptions, Message};
pub use execute::{ExecuteError, ExecutionOptions, Message, StopExecution};
use petgraph::Graph;

use crate::{
Expand Down
2 changes: 1 addition & 1 deletion crates/turborepo-lib/src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub struct RunOpts<'a> {
// Whether or not to infer the framework for each workspace.
pub(crate) framework_inference: bool,
pub profile: Option<&'a str>,
continue_on_error: bool,
pub(crate) continue_on_error: bool,
pub(crate) pass_through_args: &'a [String],
pub(crate) only: bool,
pub(crate) dry_run: bool,
Expand Down
19 changes: 19 additions & 0 deletions crates/turborepo-lib/src/process/child.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::{

use command_group::AsyncCommandGroup;
use futures::future::try_join3;
use itertools::Itertools;
pub use tokio::process::Command;
use tokio::{
io::{AsyncBufReadExt, AsyncRead, BufReader},
Expand Down Expand Up @@ -148,6 +149,7 @@ pub struct Child {
stdin: Arc<Mutex<Option<tokio::process::ChildStdin>>>,
stdout: Arc<Mutex<Option<tokio::process::ChildStdout>>>,
stderr: Arc<Mutex<Option<tokio::process::ChildStderr>>>,
label: String,
}

#[derive(Debug)]
Expand Down Expand Up @@ -177,6 +179,18 @@ impl Child {
/// Start a child process, returning a handle that can be used to interact
/// with it. The command will be started immediately.
pub fn spawn(mut command: Command, shutdown_style: ShutdownStyle) -> io::Result<Self> {
let label = {
let cmd = command.as_std();
format!(
"({}) {} {}",
cmd.get_current_dir()
.map(|dir| dir.to_string_lossy())
.unwrap_or_default(),
cmd.get_program().to_string_lossy(),
cmd.get_args().map(|s| s.to_string_lossy()).join(" ")
)
};

let group = command.group().spawn()?;

let gid = group.id();
Expand Down Expand Up @@ -266,6 +280,7 @@ impl Child {
stdin: Arc::new(Mutex::new(stdin)),
stdout: Arc::new(Mutex::new(stdout)),
stderr: Arc::new(Mutex::new(stderr)),
label,
})
}

Expand Down Expand Up @@ -381,6 +396,10 @@ impl Child {

Ok(exit)
}

pub fn label(&self) -> &str {
&self.label
}
}

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion crates/turborepo-lib/src/process/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use futures::Future;
use tokio::task::JoinSet;
use tracing::{debug, trace};

use self::child::{Child, ChildExit};
pub use self::child::{Child, ChildExit};

/// A process manager that is responsible for spawning and managing child
/// processes. When the manager is Open, new child processes can be spawned
Expand Down
23 changes: 18 additions & 5 deletions crates/turborepo-lib/src/run/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod scope;
mod summary;
pub mod task_id;
use std::{
io::{BufWriter, IsTerminal},
io::{BufWriter, IsTerminal, Write},
sync::Arc,
};

Expand Down Expand Up @@ -62,7 +62,7 @@ impl<'a> Run<'a> {
self.base.args().try_into()
}

pub async fn run(&mut self) -> Result<()> {
pub async fn run(&mut self) -> Result<i32> {
let start_at = Local::now();
let package_json_path = self.base.repo_root.join_component("package.json");
let root_package_json =
Expand Down Expand Up @@ -206,7 +206,7 @@ impl<'a> Run<'a> {
engine.dot_graph(std::io::stdout(), opts.run_opts.single_package)?
}
}
return Ok(());
return Ok(0);
}

let root_workspace = pkg_dep_graph
Expand Down Expand Up @@ -279,9 +279,22 @@ impl<'a> Run<'a> {
global_env_mode,
self.base.ui,
false,
self.processes.clone(),
&self.base.repo_root,
);

visitor.visit(engine.clone()).await?;
let errors = visitor.visit(engine.clone()).await?;

let exit_code = errors
.iter()
.filter_map(|err| err.exit_code())
.max()
// We hit some error, it shouldn't be exit code 0
.unwrap_or(if errors.is_empty() { 0 } else { 1 });

for err in &errors {
writeln!(std::io::stderr(), "{err}").ok();
}

let pass_through_env = global_hash_inputs.pass_through_env.unwrap_or_default();
let resolved_pass_through_env_vars =
Expand Down Expand Up @@ -312,6 +325,6 @@ impl<'a> Run<'a> {

run_summary.close(0, &pkg_dep_graph, self.base.ui)?;

Ok(())
Ok(exit_code)
}
}
Loading

0 comments on commit b240354

Please sign in to comment.