From 4bb88ce3d33f5c7dba01afa3c2766c12f665261d Mon Sep 17 00:00:00 2001 From: Benjamin Woodruff Date: Mon, 5 Aug 2024 17:01:48 -0400 Subject: [PATCH] [turbopack] Minimal implementation of local Vcs (#68469) *This is a migrated PR. This was in the turbo repository before the next.js merge.* **Migrated From:** https://github.com/vercel/turbo/pull/8780 # Description Local Vcs store task-local values inside of the current task's state. The `SharedReference` holding onto their values is dropped when the task exits. The contents of a local Vc must still use a refcounted `SharedReference`/`triomphe::Arc` because they can be turned into `ReadRef` objects (https://turbopack-rust-docs.vercel.sh/rustdoc/turbo_tasks/struct.ReadRef.html), which require a `'static` lifetime. We can experiment with adding a lifetime to `ReadRef` in a future iteration, but there's little advantage to doing this until we have local tasks. # Limitations This implements *just enough* to create and read local Vcs (with a test!). There are many things that are still unimplemented with `todo!()`. Most notably: - We can't resolve a local `Vc` to a `ResolvedVc`. - There's no way to return or pass a local `Vc` as an argument yet. - For safety, we should only allow construction of local `Vc`s in functions where we know that the return value is a `ResolvedValue`. Grand plan: https://www.notion.so/vercel/Resolved-Vcs-Vc-Lifetimes-Local-Vcs-and-Vc-Refcounts-49d666d3f9594017b5b312b87ddc5bff # Memory Usage - This increases the size of `Vc`/`RawVc` from 96 bits to 128 bits. With clever packing this could (in theory) be solved. However, that it increase the number of machine words (2x 64bit), so it might not have any real impact after alignment happens. - This increase the size of `CurrentTaskState`. I suspect this isn't too bad as there should be a smallish number of tasks active at any given time. **I was not able to measure any change to peak memory/heap usage.** Built using ``` cargo build --release -p next-build-test ``` Ran heaptrack on a single page (`/sink`) in `shadcn/ui` with: ``` cd ~/ui/apps/www heaptrack ~/nextpack/target/release/next-build-test run sequential 1 1 '/sink' ``` And analyzed the results with ``` heaptrack --analyze /home/bgw.linux/ui/apps/www/heaptrack.next-build-test.3066837.zst | tail -20 ``` ### Before ``` total runtime: 130.25s. calls to allocation functions: 48553541 (372786/s) temporary memory allocations: 3863919 (29666/s) peak heap memory consumption: 1.13G peak RSS (including heaptrack overhead): 2.69G total memory leaked: 4.96M suppressed leaks: 7.24K ``` ``` total runtime: 135.48s. calls to allocation functions: 48619554 (358863/s) temporary memory allocations: 3883888 (28667/s) peak heap memory consumption: 1.12G peak RSS (including heaptrack overhead): 2.70G total memory leaked: 4.71M suppressed leaks: 7.24K ``` ### After ``` total runtime: 157.20s. calls to allocation functions: 48509638 (308579/s) temporary memory allocations: 3902883 (24827/s) peak heap memory consumption: 1.13G peak RSS (including heaptrack overhead): 2.70G total memory leaked: 4.86M suppressed leaks: 7.24K ``` ``` total runtime: 130.25s. calls to allocation functions: 48553541 (372786/s) temporary memory allocations: 3863919 (29666/s) peak heap memory consumption: 1.13G peak RSS (including heaptrack overhead): 2.69G total memory leaked: 4.96M suppressed leaks: 7.24K ``` # Testing Instructions ``` cargo nextest r -p turbo-tasks -p turbo-tasks-memory ``` --- .../turbo-tasks-macros/src/value_macro.rs | 10 + .../crates/turbo-tasks-memory/src/output.rs | 2 +- .../turbo-tasks-memory/tests/local_cell.rs | 45 +++++ turbopack/crates/turbo-tasks/src/id.rs | 1 + turbopack/crates/turbo-tasks/src/manager.rs | 174 ++++++++++++------ turbopack/crates/turbo-tasks/src/raw_vc.rs | 27 ++- turbopack/crates/turbo-tasks/src/vc/mod.rs | 33 +++- turbopack/crates/turbo-tasks/src/vc/read.rs | 16 ++ 8 files changed, 239 insertions(+), 69 deletions(-) create mode 100644 turbopack/crates/turbo-tasks-memory/tests/local_cell.rs diff --git a/turbopack/crates/turbo-tasks-macros/src/value_macro.rs b/turbopack/crates/turbo-tasks-macros/src/value_macro.rs index 9e79117540894..4f1d309d3faff 100644 --- a/turbopack/crates/turbo-tasks-macros/src/value_macro.rs +++ b/turbopack/crates/turbo-tasks-macros/src/value_macro.rs @@ -316,6 +316,16 @@ pub fn value(args: TokenStream, input: TokenStream) -> TokenStream { let content = self; turbo_tasks::Vc::cell_private(#cell_access_content) } + + /// Places a value in a task-local cell stored in the current task. + /// + /// Task-local cells are stored in a task-local arena, and do not persist outside the + /// lifetime of the current task (including child tasks). Task-local cells can be resolved + /// to be converted into normal cells. + #cell_prefix fn local_cell(self) -> turbo_tasks::Vc { + let content = self; + turbo_tasks::Vc::local_cell_private(#cell_access_content) + } }; let into = if let IntoMode::New | IntoMode::Shared = into_mode { diff --git a/turbopack/crates/turbo-tasks-memory/src/output.rs b/turbopack/crates/turbo-tasks-memory/src/output.rs index d1c83f7ef216a..78d198b45fa24 100644 --- a/turbopack/crates/turbo-tasks-memory/src/output.rs +++ b/turbopack/crates/turbo-tasks-memory/src/output.rs @@ -28,7 +28,7 @@ impl Display for OutputContent { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { OutputContent::Empty => write!(f, "empty"), - OutputContent::Link(raw_vc) => write!(f, "link {}", raw_vc), + OutputContent::Link(raw_vc) => write!(f, "link {:?}", raw_vc), OutputContent::Error(err) => write!(f, "error {}", err), OutputContent::Panic(Some(message)) => write!(f, "panic {}", message), OutputContent::Panic(None) => write!(f, "panic"), diff --git a/turbopack/crates/turbo-tasks-memory/tests/local_cell.rs b/turbopack/crates/turbo-tasks-memory/tests/local_cell.rs new file mode 100644 index 0000000000000..e2f20b5501fc6 --- /dev/null +++ b/turbopack/crates/turbo-tasks-memory/tests/local_cell.rs @@ -0,0 +1,45 @@ +#![feature(arbitrary_self_types)] + +use turbo_tasks::Vc; +use turbo_tasks_testing::{register, run, Registration}; + +static REGISTRATION: Registration = register!(); + +#[turbo_tasks::value] +struct Wrapper(u32); + +#[turbo_tasks::value(transparent)] +struct TransparentWrapper(u32); + +#[tokio::test] +async fn store_and_read() { + run(®ISTRATION, async { + let a: Vc = Vc::local_cell(42); + assert_eq!(*a.await.unwrap(), 42); + + let b = Wrapper(42).local_cell(); + assert_eq!((*b.await.unwrap()).0, 42); + + let c = TransparentWrapper(42).local_cell(); + assert_eq!(*c.await.unwrap(), 42); + }) + .await +} + +#[tokio::test] +async fn store_and_read_generic() { + run(®ISTRATION, async { + // `Vc>>` is stored as `Vc>>` and requires special + // transmute handling + let cells: Vc>> = + Vc::local_cell(vec![Vc::local_cell(1), Vc::local_cell(2), Vc::cell(3)]); + + let mut output = Vec::new(); + for el in cells.await.unwrap() { + output.push(*el.await.unwrap()); + } + + assert_eq!(output, vec![1, 2, 3]); + }) + .await +} diff --git a/turbopack/crates/turbo-tasks/src/id.rs b/turbopack/crates/turbo-tasks/src/id.rs index 06fff1068d958..6eefce9506067 100644 --- a/turbopack/crates/turbo-tasks/src/id.rs +++ b/turbopack/crates/turbo-tasks/src/id.rs @@ -70,6 +70,7 @@ define_id!(ValueTypeId: u32); define_id!(TraitTypeId: u32); define_id!(BackendJobId: u32); define_id!(ExecutionId: u64, derive(Debug)); +define_id!(LocalCellId: u32, derive(Debug)); impl Debug for TaskId { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { diff --git a/turbopack/crates/turbo-tasks/src/manager.rs b/turbopack/crates/turbo-tasks/src/manager.rs index 17b25c9b4ca12..dfbd6ce3ca65d 100644 --- a/turbopack/crates/turbo-tasks/src/manager.rs +++ b/turbopack/crates/turbo-tasks/src/manager.rs @@ -30,8 +30,8 @@ use crate::{ }, capture_future::{self, CaptureFuture}, event::{Event, EventListener}, - id::{BackendJobId, FunctionId, TraitTypeId}, - id_factory::IdFactoryWithReuse, + id::{BackendJobId, ExecutionId, FunctionId, LocalCellId, TraitTypeId}, + id_factory::{IdFactory, IdFactoryWithReuse}, magic_any::MagicAny, raw_vc::{CellId, RawVc}, registry, @@ -243,6 +243,7 @@ pub struct TurboTasks { this: Weak, backend: B, task_id_factory: IdFactoryWithReuse, + execution_id_factory: IdFactory, stopped: AtomicBool, currently_scheduled_tasks: AtomicUsize, currently_scheduled_foreground_jobs: AtomicUsize, @@ -257,7 +258,6 @@ pub struct TurboTasks { program_start: Instant, } -#[derive(Default)] struct CurrentTaskState { /// Affected tasks, that are tracked during task execution. These tasks will /// be invalidated when the execution finishes or before reading a cell @@ -266,6 +266,26 @@ struct CurrentTaskState { /// True if the current task has state in cells stateful: bool, + + /// A unique identifier created for each unique `CurrentTaskState`. Used to + /// check that [`CurrentTaskState::local_cells`] are valid for the current + /// `RawVc::LocalCell`. + execution_id: ExecutionId, + + /// Cells for locally allocated Vcs (`RawVc::LocalCell`). This is freed + /// (along with `CurrentTaskState`) when the task finishes executing. + local_cells: Vec, +} + +impl CurrentTaskState { + fn new(execution_id: ExecutionId) -> Self { + Self { + tasks_to_notify: Vec::new(), + stateful: false, + execution_id, + local_cells: Vec::new(), + } + } } // TODO implement our own thread pool and make these thread locals instead @@ -293,6 +313,7 @@ impl TurboTasks { this: this.clone(), backend, task_id_factory, + execution_id_factory: IdFactory::new(), stopped: AtomicBool::new(false), currently_scheduled_tasks: AtomicUsize::new(0), currently_scheduled_background_jobs: AtomicUsize::new(0), @@ -488,57 +509,61 @@ impl TurboTasks { let this = self.pin(); let future = async move { - #[allow(clippy::blocks_in_conditions)] - while CURRENT_TASK_STATE - .scope(Default::default(), async { - if this.stopped.load(Ordering::Acquire) { - return false; - } + let mut schedule_again = true; + while schedule_again { + let task_state = + RefCell::new(CurrentTaskState::new(this.execution_id_factory.get())); + schedule_again = CURRENT_TASK_STATE + .scope(task_state, async { + if this.stopped.load(Ordering::Acquire) { + return false; + } - // Setup thread locals - CELL_COUNTERS - .scope(Default::default(), async { - let Some(TaskExecutionSpec { future, span }) = - this.backend.try_start_task_execution(task_id, &*this) - else { - return false; - }; - - async { - let (result, duration, memory_usage) = - CaptureFuture::new(AssertUnwindSafe(future).catch_unwind()) - .await; - - let result = result.map_err(|any| match any.downcast::() { - Ok(owned) => Some(Cow::Owned(*owned)), - Err(any) => match any.downcast::<&'static str>() { - Ok(str) => Some(Cow::Borrowed(*str)), - Err(_) => None, - }, - }); - this.backend.task_execution_result(task_id, result, &*this); - let stateful = this.finish_current_task_state(); - let cell_counters = - CELL_COUNTERS.with(|cc| take(&mut *cc.borrow_mut())); - let schedule_again = this.backend.task_execution_completed( - task_id, - duration, - memory_usage, - cell_counters, - stateful, - &*this, - ); - // task_execution_completed might need to notify tasks - this.notify_scheduled_tasks(); - schedule_again - } - .instrument(span) + // Setup thread locals + CELL_COUNTERS + .scope(Default::default(), async { + let Some(TaskExecutionSpec { future, span }) = + this.backend.try_start_task_execution(task_id, &*this) + else { + return false; + }; + + async { + let (result, duration, memory_usage) = + CaptureFuture::new(AssertUnwindSafe(future).catch_unwind()) + .await; + + let result = + result.map_err(|any| match any.downcast::() { + Ok(owned) => Some(Cow::Owned(*owned)), + Err(any) => match any.downcast::<&'static str>() { + Ok(str) => Some(Cow::Borrowed(*str)), + Err(_) => None, + }, + }); + this.backend.task_execution_result(task_id, result, &*this); + let stateful = this.finish_current_task_state(); + let cell_counters = + CELL_COUNTERS.with(|cc| take(&mut *cc.borrow_mut())); + let schedule_again = this.backend.task_execution_completed( + task_id, + duration, + memory_usage, + cell_counters, + stateful, + &*this, + ); + // task_execution_completed might need to notify tasks + this.notify_scheduled_tasks(); + schedule_again + } + .instrument(span) + .await + }) .await - }) - .await - }) - .await - {} + }) + .await; + } this.finish_primary_job(); anyhow::Ok(()) }; @@ -841,6 +866,7 @@ impl TurboTasks { let CurrentTaskState { tasks_to_notify, stateful, + .. } = &mut *cell.borrow_mut(); (*stateful, take(tasks_to_notify)) }); @@ -1692,3 +1718,47 @@ pub fn find_cell_by_type(ty: ValueTypeId) -> CurrentCellRef { } }) } + +pub(crate) fn create_local_cell(value: TypedCellContent) -> (ExecutionId, LocalCellId) { + CURRENT_TASK_STATE.with(|cell| { + let CurrentTaskState { + execution_id, + local_cells, + .. + } = &mut *cell.borrow_mut(); + + // store in the task-local arena + local_cells.push(value); + + // generate a one-indexed id + let raw_local_cell_id = local_cells.len(); + let local_cell_id = if cfg!(debug_assertions) { + LocalCellId::from(u32::try_from(raw_local_cell_id).unwrap()) + } else { + unsafe { LocalCellId::new_unchecked(raw_local_cell_id as u32) } + }; + + (*execution_id, local_cell_id) + }) +} + +/// Panics if the ExecutionId does not match the expected value. +pub(crate) fn read_local_cell( + execution_id: ExecutionId, + local_cell_id: LocalCellId, +) -> TypedCellContent { + CURRENT_TASK_STATE.with(|cell| { + let CurrentTaskState { + execution_id: expected_execution_id, + local_cells, + .. + } = &*cell.borrow(); + assert_eq!( + execution_id, *expected_execution_id, + "This Vc is local. Local Vcs must only be accessed within their own task. Resolve the \ + Vc to convert it into a non-local version." + ); + // local cell ids are one-indexed (they use NonZeroU32) + local_cells[(*local_cell_id as usize) - 1].clone() + }) +} diff --git a/turbopack/crates/turbo-tasks/src/raw_vc.rs b/turbopack/crates/turbo-tasks/src/raw_vc.rs index ec6fa115fc1ad..bf8185c2fd917 100644 --- a/turbopack/crates/turbo-tasks/src/raw_vc.rs +++ b/turbopack/crates/turbo-tasks/src/raw_vc.rs @@ -15,7 +15,8 @@ use thiserror::Error; use crate::{ backend::{CellContent, TypedCellContent}, event::EventListener, - manager::{read_task_cell, read_task_output, TurboTasksApi}, + id::{ExecutionId, LocalCellId}, + manager::{read_local_cell, read_task_cell, read_task_output, TurboTasksApi}, registry::{self, get_value_type}, turbo_tasks, CollectiblesSource, TaskId, TraitTypeId, ValueTypeId, Vc, VcValueTrait, }; @@ -53,6 +54,8 @@ impl Display for CellId { pub enum RawVc { TaskOutput(TaskId), TaskCell(TaskId, CellId), + #[serde(skip)] + LocalCell(ExecutionId, LocalCellId), } impl RawVc { @@ -60,6 +63,7 @@ impl RawVc { match self { RawVc::TaskOutput(_) => false, RawVc::TaskCell(_, _) => true, + RawVc::LocalCell(_, _) => false, } } @@ -120,6 +124,7 @@ impl RawVc { return Err(ResolveTypeError::NoContent); } } + RawVc::LocalCell(_, _) => todo!(), } } } @@ -152,6 +157,7 @@ impl RawVc { return Err(ResolveTypeError::NoContent); } } + RawVc::LocalCell(_, _) => todo!(), } } } @@ -171,6 +177,7 @@ impl RawVc { current = read_task_output(&*tt, task, false).await?; } RawVc::TaskCell(_, _) => return Ok(current), + RawVc::LocalCell(_, _) => todo!(), } } } @@ -190,6 +197,7 @@ impl RawVc { current = read_task_output(&*tt, task, true).await?; } RawVc::TaskCell(_, _) => return Ok(current), + RawVc::LocalCell(_, _) => todo!(), } } } @@ -202,6 +210,7 @@ impl RawVc { pub fn get_task_id(&self) -> TaskId { match self { RawVc::TaskOutput(t) | RawVc::TaskCell(t, _) => *t, + RawVc::LocalCell(_, _) => todo!(), } } } @@ -227,19 +236,6 @@ impl CollectiblesSource for RawVc { } } -impl Display for RawVc { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - RawVc::TaskOutput(task) => { - write!(f, "output of {}", task) - } - RawVc::TaskCell(task, index) => { - write!(f, "value {} of {}", index, task) - } - } - } -} - pub struct ReadRawVcFuture { turbo_tasks: Arc, strongly_consistent: bool, @@ -358,6 +354,9 @@ impl Future for ReadRawVcFuture { Err(err) => return Poll::Ready(Err(err)), } } + RawVc::LocalCell(execution_id, local_cell_id) => { + return Poll::Ready(Ok(read_local_cell(execution_id, local_cell_id))); + } }; // SAFETY: listener is from previous pinned this match unsafe { Pin::new_unchecked(&mut listener) }.poll(cx) { diff --git a/turbopack/crates/turbo-tasks/src/vc/mod.rs b/turbopack/crates/turbo-tasks/src/vc/mod.rs index 180a02bc7b3f2..5c10f18769d49 100644 --- a/turbopack/crates/turbo-tasks/src/vc/mod.rs +++ b/turbopack/crates/turbo-tasks/src/vc/mod.rs @@ -25,10 +25,12 @@ pub use self::{ traits::{Dynamic, TypedForInput, Upcast, VcValueTrait, VcValueType}, }; use crate::{ + backend::CellContent, debug::{ValueDebug, ValueDebugFormat, ValueDebugFormatString}, + manager::create_local_cell, registry, trace::{TraceRawVcs, TraceRawVcsContext}, - CellId, CollectiblesSource, RawVc, ResolveTypeError, + CellId, CollectiblesSource, RawVc, ResolveTypeError, SharedReference, }; /// A Value Cell (`Vc` for short) is a reference to a memoized computation @@ -266,9 +268,29 @@ impl Vc where T: VcValueType, { + // called by the `.cell()` method generated by the `#[turbo_tasks::value]` macro #[doc(hidden)] pub fn cell_private(inner: >::Target) -> Self { - >::cell(inner) + >::cell(inner) + } + + // called by the `.local_cell()` method generated by the `#[turbo_tasks::value]` + // macro + #[doc(hidden)] + pub fn local_cell_private(inner: >::Target) -> Self { + // `T::CellMode` isn't applicable here, we always create new local cells. Local + // cells aren't stored across executions, so there can be no concept of + // "updating" the cell across multiple executions. + let (execution_id, local_cell_id) = create_local_cell( + CellContent(Some(SharedReference::new(triomphe::Arc::new( + T::Read::target_to_repr(inner), + )))) + .into_typed(T::get_value_type_id()), + ); + Vc { + node: RawVc::LocalCell(execution_id, local_cell_id), + _t: PhantomData, + } } } @@ -281,6 +303,13 @@ where pub fn cell(inner: Inner) -> Self { >::cell(inner) } + + pub fn local_cell(inner: Inner) -> Self { + // `T::CellMode` isn't applicable here, we always create new local cells. Local + // cells aren't stored across executions, so there can be no concept of + // "updating" the cell across multiple executions. + Self::local_cell_private(inner) + } } impl Vc diff --git a/turbopack/crates/turbo-tasks/src/vc/read.rs b/turbopack/crates/turbo-tasks/src/vc/read.rs index 6c7e06d49c347..5fb7c9fc11ad0 100644 --- a/turbopack/crates/turbo-tasks/src/vc/read.rs +++ b/turbopack/crates/turbo-tasks/src/vc/read.rs @@ -48,6 +48,9 @@ where /// Convert a reference to a target type to a reference to a value. fn target_to_value_ref(target: &Self::Target) -> &T; + /// Convert the target type to the repr. + fn target_to_repr(target: Self::Target) -> Self::Repr; + /// Convert a reference to a repr type to a reference to a value. fn repr_to_value_ref(repr: &Self::Repr) -> &T; } @@ -81,6 +84,10 @@ where target } + fn target_to_repr(target: Self::Target) -> Self::Repr { + target + } + fn repr_to_value_ref(repr: &Self::Repr) -> &T { repr } @@ -133,6 +140,15 @@ where } } + fn target_to_repr(target: Self::Target) -> Self::Repr { + // Safety: see `Self::value_to_target` above. + unsafe { + std::mem::transmute_copy::, Self::Repr>(&ManuallyDrop::new( + target, + )) + } + } + fn repr_to_value_ref(repr: &Self::Repr) -> &T { // Safety: see `Self::value_to_target` above. unsafe {