Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Turbopack] Preparations for persisting #70468

Merged
merged 13 commits into from
Oct 7, 2024
11 changes: 5 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ proc-macro2 = "1.0.79"
qstring = "0.7.2"
quote = "1.0.23"
rand = "0.8.5"
rayon = "1.10.0"
regex = "1.10.6"
rstest = "0.16.0"
rustc-hash = "1.1.0"
Expand Down
43 changes: 19 additions & 24 deletions crates/next-api/src/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,21 +216,19 @@ impl ProjectContainer {
impl ProjectContainer {
#[tracing::instrument(level = "info", name = "initialize project", skip_all)]
pub async fn initialize(self: Vc<Self>, options: ProjectOptions) -> Result<()> {
let poll_interval = options.watch.poll_interval;
let watch = options.watch;

self.await?.options_state.set(Some(options));

let project = self.project();
project
.project_fs()
.strongly_consistent()
.await?
.start_watching_with_invalidation_reason(poll_interval)?;
project
.output_fs()
.strongly_consistent()
.await?
.invalidate_with_reason();
let project_fs = project.project_fs().strongly_consistent().await?;
if watch.enable {
project_fs.start_watching_with_invalidation_reason(watch.poll_interval)?;
} else {
project_fs.invalidate_with_reason();
}
let output_fs = project.output_fs().strongly_consistent().await?;
output_fs.invalidate_with_reason();
Ok(())
}

Expand Down Expand Up @@ -293,20 +291,23 @@ impl ProjectContainer {
}

// TODO: Handle mode switch, should prevent mode being switched.
let watch = new_options.watch;

let project = self.project();
let prev_project_fs = project.project_fs().strongly_consistent().await?;
let prev_output_fs = project.output_fs().strongly_consistent().await?;

let poll_interval = new_options.watch.poll_interval;

this.options_state.set(Some(new_options));
let project_fs = project.project_fs().strongly_consistent().await?;
let output_fs = project.output_fs().strongly_consistent().await?;

if !ReadRef::ptr_eq(&prev_project_fs, &project_fs) {
// TODO stop watching: prev_project_fs.stop_watching()?;
project_fs.start_watching_with_invalidation_reason(poll_interval)?;
if watch.enable {
// TODO stop watching: prev_project_fs.stop_watching()?;
project_fs.start_watching_with_invalidation_reason(watch.poll_interval)?;
} else {
project_fs.invalidate_with_reason();
}
}
if !ReadRef::ptr_eq(&prev_output_fs, &output_fs) {
prev_output_fs.invalidate_with_reason();
Expand Down Expand Up @@ -533,18 +534,12 @@ impl Project {
}

#[turbo_tasks::function]
async fn project_fs(&self) -> Result<Vc<DiskFileSystem>> {
let disk_fs = DiskFileSystem::new(
fn project_fs(&self) -> Vc<DiskFileSystem> {
DiskFileSystem::new(
PROJECT_FILESYSTEM_NAME.into(),
self.root_path.clone(),
vec![],
);
if self.watch.enable {
disk_fs
.await?
.start_watching_with_invalidation_reason(self.watch.poll_interval)?;
}
Ok(disk_fs)
)
}

#[turbo_tasks::function]
Expand Down
10 changes: 5 additions & 5 deletions crates/next-core/src/app_structure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub struct AppDirModules {
pub default: Option<Vc<FileSystemPath>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub route: Option<Vc<FileSystemPath>>,
#[serde(skip_serializing_if = "Metadata::is_empty")]
#[serde(skip_serializing_if = "Metadata::is_empty", default)]
pub metadata: Metadata,
}

Expand Down Expand Up @@ -137,13 +137,13 @@ impl From<MetadataWithAltItem> for MetadataItem {
/// Metadata file that can be placed in any segment of the app directory.
#[derive(Default, Clone, Debug, Serialize, Deserialize, PartialEq, Eq, TraceRawVcs)]
pub struct Metadata {
#[serde(skip_serializing_if = "Vec::is_empty")]
#[serde(skip_serializing_if = "Vec::is_empty", default)]
pub icon: Vec<MetadataWithAltItem>,
#[serde(skip_serializing_if = "Vec::is_empty")]
#[serde(skip_serializing_if = "Vec::is_empty", default)]
pub apple: Vec<MetadataWithAltItem>,
#[serde(skip_serializing_if = "Vec::is_empty")]
#[serde(skip_serializing_if = "Vec::is_empty", default)]
pub twitter: Vec<MetadataWithAltItem>,
#[serde(skip_serializing_if = "Vec::is_empty")]
#[serde(skip_serializing_if = "Vec::is_empty", default)]
pub open_graph: Vec<MetadataWithAltItem>,
#[serde(skip_serializing_if = "Option::is_none")]
pub sitemap: Option<MetadataItem>,
Expand Down
38 changes: 37 additions & 1 deletion crates/next-core/src/next_manifests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl Default for MiddlewaresManifest {
Serialize,
Deserialize,
)]
#[serde(rename_all = "camelCase")]
#[serde(rename_all = "camelCase", default)]
pub struct MiddlewareMatcher {
// When skipped next.js with fill that during merging.
#[serde(skip_serializing_if = "Option::is_none")]
Expand Down Expand Up @@ -308,3 +308,39 @@ pub struct ClientBuildManifest<'a> {
#[serde(flatten)]
pub pages: HashMap<RcStr, Vec<&'a str>>,
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_middleware_matcher_serialization() {
let matchers = vec![
MiddlewareMatcher {
regexp: None,
locale: false,
has: None,
missing: None,
original_source: "".into(),
},
MiddlewareMatcher {
regexp: Some(".*".into()),
locale: true,
has: Some(vec![RouteHas::Query {
key: "foo".into(),
value: None,
}]),
missing: Some(vec![RouteHas::Query {
key: "bar".into(),
value: Some("value".into()),
}]),
original_source: "source".into(),
},
];

let serialized = serde_json::to_string(&matchers).unwrap();
let deserialized: Vec<MiddlewareMatcher> = serde_json::from_str(&serialized).unwrap();

assert_eq!(matchers, deserialized);
}
}
14 changes: 13 additions & 1 deletion packages/next/src/build/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1492,7 +1492,15 @@ export default async function build(
'Building'
)
const promises: Promise<any>[] = []
const sema = new Sema(10)

// Concurrency will start at INITIAL_CONCURRENCY and
// slowly ramp up to CONCURRENCY by increasing the
// concurrency by 1 every time a task is completed.
const INITIAL_CONCURRENCY = 5
const CONCURRENCY = 10

const sema = new Sema(INITIAL_CONCURRENCY)
let remainingRampup = CONCURRENCY - INITIAL_CONCURRENCY
const enqueue = (fn: () => Promise<void>) => {
promises.push(
(async () => {
Expand All @@ -1501,6 +1509,10 @@ export default async function build(
await fn()
} finally {
sema.release()
if (remainingRampup > 0) {
remainingRampup--
sema.release()
}
progress()
}
})()
Expand Down
1 change: 1 addition & 0 deletions turbopack/crates/turbo-tasks-fs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ jsonc-parser = { version = "0.21.0", features = ["serde"] }
mime = { workspace = true }
notify = { workspace = true }
parking_lot = { workspace = true }
rayon = { workspace = true }
serde = { workspace = true, features = ["rc"] }
serde_bytes = { workspace = true }
serde_json = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion turbopack/crates/turbo-tasks-fs/src/invalidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl InvalidationReasonKind for WatchChangeKind {

/// Invalidation was caused by a directory starting to watch from which was read
/// before.
#[derive(PartialEq, Eq, Hash)]
#[derive(PartialEq, Eq, Hash, Clone)]
pub struct WatchStart {
pub name: RcStr,
pub path: RcStr,
Expand Down
100 changes: 57 additions & 43 deletions turbopack/crates/turbo-tasks-fs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use invalidation::InvalidateFilesystem;
use invalidator_map::InvalidatorMap;
use jsonc_parser::{parse_to_serde_value, ParseOptions};
use mime::Mime;
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use read_glob::read_glob;
pub use read_glob::ReadGlobResult;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -273,30 +274,43 @@ impl DiskFileSystem {

pub fn invalidate(&self) {
let _span = tracing::info_span!("invalidate filesystem", path = &*self.root).entered();
for (_, invalidators) in take(&mut *self.invalidator_map.lock().unwrap()).into_iter() {
invalidators.into_iter().for_each(|i| i.invalidate());
}
for (_, invalidators) in take(&mut *self.dir_invalidator_map.lock().unwrap()).into_iter() {
invalidators.into_iter().for_each(|i| i.invalidate());
}
let span = tracing::Span::current();
let handle = tokio::runtime::Handle::current();
let invalidator_map = take(&mut *self.invalidator_map.lock().unwrap());
let dir_invalidator_map = take(&mut *self.dir_invalidator_map.lock().unwrap());
let iter = invalidator_map
.into_par_iter()
.chain(dir_invalidator_map.into_par_iter())
.flat_map(|(_, invalidators)| invalidators.into_par_iter());
iter.for_each(|i| {
Copy link
Member

@kdy1 kdy1 Oct 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, rayon iterators are designed for cpu-heavy tasks and it does not perform well for mutex-related things. But not sure if there's a better way.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A very recent addition to this space is https://github.com/dragostis/chili

Or we could just call spawn_blocking a bunch of times (once per cpu core?) and re-use tokio's blocking thread pool.

I don't love pulling in a heavy dependency like rayon, but it looks like we're already pulling it in for other reasons so 🤷

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I also dislike using rayon here, but it's cpu-heavy work here and tokio doesn't offer parallel iterators. We could rebuild that with spawn_blocking, but that sounds like it will be less efficient compared to rayon...

let _span = span.clone().entered();
let _guard = handle.enter();
i.invalidate()
});
self.serialization_invalidator.invalidate();
}

pub fn invalidate_with_reason(&self) {
let _span = tracing::info_span!("invalidate filesystem", path = &*self.root).entered();
for (path, invalidators) in take(&mut *self.invalidator_map.lock().unwrap()).into_iter() {
let reason = InvalidateFilesystem { path: path.into() };
invalidators
.into_iter()
.for_each(|i| i.invalidate_with_reason(reason.clone()));
}
for (path, invalidators) in take(&mut *self.dir_invalidator_map.lock().unwrap()).into_iter()
{
let reason = InvalidateFilesystem { path: path.into() };
invalidators
.into_iter()
.for_each(|i| i.invalidate_with_reason(reason.clone()));
}
let span = tracing::Span::current();
let handle = tokio::runtime::Handle::current();
let invalidator_map = take(&mut *self.invalidator_map.lock().unwrap());
let dir_invalidator_map = take(&mut *self.dir_invalidator_map.lock().unwrap());
let iter = invalidator_map
.into_par_iter()
.chain(dir_invalidator_map.into_par_iter())
.flat_map(|(path, invalidators)| {
let _span = span.clone().entered();
let reason = InvalidateFilesystem { path: path.into() };
invalidators
.into_par_iter()
.map(move |i| (reason.clone(), i))
});
iter.for_each(|(reason, invalidator)| {
let _span = span.clone().entered();
let _guard = handle.enter();
invalidator.invalidate_with_reason(reason)
});
self.serialization_invalidator.invalidate();
}

Expand Down Expand Up @@ -333,6 +347,7 @@ impl DiskFileSystem {
invalidator_map,
dir_invalidator_map,
poll_interval,
self.serialization_invalidator.clone(),
)?;
self.serialization_invalidator.invalidate();

Expand Down Expand Up @@ -441,11 +456,11 @@ impl DiskFileSystem {

// we use the sync std function here as it's a lot faster (600%) in
// node-file-trace
let read_dir = match retry_blocking(
&full_path,
tracing::info_span!("read directory", path = display(full_path.display())),
|path| std::fs::read_dir(path),
)
let read_dir = match retry_blocking(&full_path, |path| {
let _span =
tracing::info_span!("read directory", path = display(path.display())).entered();
std::fs::read_dir(path)
})
.await
{
Ok(dir) => dir,
Expand Down Expand Up @@ -810,26 +825,25 @@ impl FileSystem for DiskFileSystem {
} else {
PathBuf::from(unix_to_sys(target).as_ref())
};
retry_blocking(
&target_path,
tracing::info_span!("write symlink", path = display(full_path.display())),
move |target_path| {
// we use the sync std method here because `symlink` is fast
// if we put it into a task, it will be slower
#[cfg(not(target_family = "windows"))]
{
std::os::unix::fs::symlink(target_path, &full_path)
}
#[cfg(target_family = "windows")]
{
if link_type.contains(LinkType::DIRECTORY) {
std::os::windows::fs::symlink_dir(target_path, &full_path)
} else {
std::os::windows::fs::symlink_file(target_path, &full_path)
}
retry_blocking(&target_path, move |target_path| {
let _span =
tracing::info_span!("write symlink", path = display(target_path.display()))
.entered();
// we use the sync std method here because `symlink` is fast
// if we put it into a task, it will be slower
#[cfg(not(target_family = "windows"))]
{
std::os::unix::fs::symlink(target_path, &full_path)
}
#[cfg(target_family = "windows")]
{
if link_type.contains(LinkType::DIRECTORY) {
std::os::windows::fs::symlink_dir(target_path, &full_path)
} else {
std::os::windows::fs::symlink_file(target_path, &full_path)
}
},
)
}
})
.await
.with_context(|| format!("create symlink to {}", target))?;
}
Expand Down
Loading
Loading