Skip to content

Commit

Permalink
instrument evaluation
Browse files Browse the repository at this point in the history
  • Loading branch information
sokra committed Oct 11, 2024
1 parent 770edf3 commit f96dab9
Showing 1 changed file with 14 additions and 7 deletions.
21 changes: 14 additions & 7 deletions turbopack/crates/turbopack-node/src/evaluate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use indexmap::indexmap;
use parking_lot::Mutex;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde_json::Value as JsonValue;
use tracing::{trace_span, Instrument};
use turbo_tasks::{
duration_span, mark_finished, prevent_gc, util::SharedError, Completion, RawVc, TaskInput,
TryJoinIterExt, Value, Vc,
Expand Down Expand Up @@ -308,6 +309,7 @@ pub fn evaluate(
})
}

#[tracing::instrument(skip_all)]
pub async fn compute(
evaluate_context: impl EvaluateContext,
sender: Vc<JavaScriptStreamSender>,
Expand All @@ -324,9 +326,9 @@ pub async fn compute(

// Read this strongly consistent, since we don't want to run inconsistent
// node.js code.
let pool = pool.strongly_consistent().await?;
let pool = pool.strongly_consistent().instrument(tracing::trace_span!("pool")).await?;

let args = evaluate_context.args().iter().try_join().await?;
let args = evaluate_context.args().iter().try_join().instrument(tracing::trace_span!("args")).await?;
// Assume this is a one-off operation, so we can kill the process
// TODO use a better way to decide that.
let kill = !evaluate_context.keep_alive();
Expand All @@ -348,14 +350,15 @@ pub async fn compute(
},
PoolErrorHandler,
)
.instrument(trace_span!("evaluate"))
.await
.map_err(|(e, _)| e)?;

// The evaluation sent an initial intermediate value without completing. We'll
// need to spawn a new thread to continually pull data out of the process,
// and ferry that along.
loop {
let output = pull_operation(&mut operation, &pool, &evaluate_context, &mut state).await?;
let output = pull_operation(&mut operation, &pool, &evaluate_context, &mut state).instrument(trace_span!("pull")).await?;

match output {
LoopResult::Continue(data) => {
Expand All @@ -376,11 +379,14 @@ pub async fn compute(
}
}

evaluate_context.finish(state, &pool).await?;
async move {
evaluate_context.finish(state, &pool).await?;

if kill {
operation.wait_or_kill().await?;
}
if kill {
operation.wait_or_kill().await?;
}
anyhow::Ok(())
}.instrument(tracing::trace_span!("finish")).await?;
};

let mut sender = (sender.get)();
Expand All @@ -399,6 +405,7 @@ pub async fn compute(

/// Repeatedly pulls from the NodeJsOperation until we receive a
/// value/error/end.
#[tracing::instrument(skip_all)]
async fn pull_operation<T: EvaluateContext>(
operation: &mut NodeJsOperation,
pool: &NodeJsPool,
Expand Down

0 comments on commit f96dab9

Please sign in to comment.