Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Basic adaptive batching for parallel query iteration #4777

Closed
wants to merge 61 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
65bd41f
Remove task_pool parameter from par_for_each(_mut)
james7132 May 9, 2022
b110039
Fix benchmarks
james7132 May 9, 2022
cf10758
Embed task pool into QueryState
james7132 May 9, 2022
076db46
Remove the clone
james7132 May 9, 2022
12eefa1
Update docs
james7132 May 9, 2022
3c76af9
Merge branch 'main' into parallel-ergonomics
james7132 May 14, 2022
332e851
Update docs.
james7132 May 16, 2022
186fd50
Update docs.
james7132 May 16, 2022
88044d1
Update docs.
james7132 May 16, 2022
704cf61
Merge branch 'main' into parallel-ergonomics
james7132 May 16, 2022
cedd756
Merge branch 'main' into parallel-ergonomics
james7132 May 16, 2022
1f94913
Basic adaptive batching
james7132 May 17, 2022
5b3a730
Fix CI
james7132 May 17, 2022
8b365f4
Add par_iter impls
james7132 May 17, 2022
21d95cb
Merge branch 'parallel-ergonomics' into adaptive-batching
james7132 May 17, 2022
92af1f4
Fix CI
james7132 May 17, 2022
2309ab7
Add default batches per thread
james7132 May 21, 2022
f788a7a
Merge branch 'main' into adaptive-batching
james7132 May 30, 2022
6fd0cf1
Formatting
james7132 May 30, 2022
10904c1
Merge branch 'main' into adaptive-batching
james7132 Jun 15, 2022
ac4c524
Merge branch 'main' into adaptive-batching
james7132 Jun 16, 2022
0b8c1eb
Update example comments
james7132 Jun 20, 2022
3674f10
Add documentation comments to `bevy_window` (#4333)
arnavc52 Jun 16, 2022
08bf88b
bevy_render: Fix KTX2 UASTC format mapping (#4569)
superdump Jun 17, 2022
3674e19
update hashbrown to 0.12 (#5035)
mockersf Jun 17, 2022
619bdb9
WGSL: use correct syntax for matrix access (#5039)
mockersf Jun 18, 2022
65a6c9a
Implement `Eq` and `PartialEq` for `MouseScrollUnit` (#5048)
frewsxcv Jun 19, 2022
8959c2d
enable optional dependencies to stay optional (#5023)
mockersf Jun 20, 2022
9a8e5fd
gltf: do not import IoTaskPool in wasm (#5038)
mockersf Jun 20, 2022
03ffbe8
Physical viewport calculation fix (#5055)
aevyrie Jun 20, 2022
b89d878
Cleanups in diagnostics (#3871)
mockersf Jun 20, 2022
d3b997b
`bevy_reflect`: put `serialize` into external `ReflectSerialize` type…
jakobhellermann Jun 20, 2022
c5df0d6
Add benchmarks for schedule dependency resolution (#4961)
JoJoJet Jun 20, 2022
d5a5993
change panicking test to not run on global task pool (#4998)
hymm Jun 20, 2022
606635f
Add a `release_all` function to `Input`. (#5011)
Hoidigan Jun 20, 2022
5f6a290
Update `clap` to 3.2 in tools using `value_parser` (#5031)
mlodato517 Jun 20, 2022
ce10028
Fix redundant "have" in CONTRIBUTING (#5036)
mlodato517 Jun 20, 2022
ec9a481
Add `Input::reset_all` (#5015)
Hoidigan Jun 20, 2022
d74a318
Fix Nix section of linux_dependencies.md (#5050)
fluunke Jun 20, 2022
2381ba2
Fixed bevy_ui touch input (#4099)
ManevilleF Jun 20, 2022
d025d03
Improve entity and component API docs (#4767)
Nilirad Jun 21, 2022
4132b60
Change check_visibility to use thread-local queues instead of a chann…
james7132 Jun 21, 2022
34ae6ba
Mark mutable APIs under ECS storage as pub(crate) (#5065)
james7132 Jun 21, 2022
1eaee67
Callable PBR functions (#4939)
superdump Jun 21, 2022
f3eef7f
depend on dioxus(and bevy)-maintained fork of stretch (taffy) (#4716)
colepoirier Jun 21, 2022
4ab1465
Make the batch size more configurable
james7132 Jun 22, 2022
7bd1617
Allow reusing the same ParIter
james7132 Jun 22, 2022
8bc37a0
More complete docs
james7132 Jun 22, 2022
bc2c649
Merge branch 'main' into adaptive-batching
james7132 Jun 22, 2022
b57d547
More CI fixes
james7132 Jun 22, 2022
5556377
Merge branch 'main' into adaptive-batching
james7132 Nov 14, 2022
a82ff07
Fix CI
james7132 Nov 14, 2022
fd8fefa
Defer to for_each if there is zero or one threads
james7132 Nov 14, 2022
a751055
Merge branch 'main' into adaptive-batching
james7132 Dec 29, 2022
9111a00
Fix CI
james7132 Dec 29, 2022
266bfce
Merge branch 'main' into adaptive-batching
james7132 Jan 6, 2023
2cfcb16
Fix build
james7132 Jan 6, 2023
73e5dfc
Merge branch 'main' into adaptive-batching
james7132 Jan 18, 2023
acf2f5b
Formatting
james7132 Jan 18, 2023
015e201
Add documentation for BatchingStrategy
james7132 Jan 18, 2023
c6363ea
Merge branch 'main' into adaptive-batching
james7132 Jan 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benches/benches/bevy_ecs/iteration/heavy_compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub fn heavy_compute(c: &mut Criterion) {
}));

fn sys(mut query: Query<(&mut Position, &mut Transform)>) {
query.par_for_each_mut(128, |(mut pos, mut mat)| {
query.par_iter_mut().for_each_mut(|(mut pos, mut mat)| {
for _ in 0..100 {
mat.0 = mat.0.inverse();
}
Expand Down
30 changes: 16 additions & 14 deletions crates/bevy_animation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,20 +352,22 @@ pub fn animation_player(
parents: Query<(Option<With<AnimationPlayer>>, Option<&Parent>)>,
mut animation_players: Query<(Entity, Option<&Parent>, &mut AnimationPlayer)>,
) {
animation_players.par_for_each_mut(10, |(root, maybe_parent, mut player)| {
update_transitions(&mut player, &time);
run_animation_player(
root,
player,
&time,
&animations,
&names,
&transforms,
maybe_parent,
&parents,
&children,
);
});
animation_players
.par_iter_mut()
.for_each_mut(|(root, maybe_parent, mut player)| {
update_transitions(&mut player, &time);
run_animation_player(
root,
player,
&time,
&animations,
&names,
&transforms,
maybe_parent,
&parents,
&children,
);
});
}

#[allow(clippy::too_many_arguments)]
Expand Down
12 changes: 6 additions & 6 deletions crates/bevy_ecs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,8 @@ mod tests {
let results = Arc::new(Mutex::new(Vec::new()));
world
.query::<(Entity, &A)>()
.par_for_each(&world, 2, |(e, &A(i))| {
.par_iter(&world)
.for_each(|(e, &A(i))| {
results.lock().unwrap().push((e, i));
});
results.lock().unwrap().sort();
Expand All @@ -420,11 +421,10 @@ mod tests {
let e4 = world.spawn((SparseStored(4), A(1))).id();
let e5 = world.spawn((SparseStored(5), A(1))).id();
let results = Arc::new(Mutex::new(Vec::new()));
world.query::<(Entity, &SparseStored)>().par_for_each(
&world,
2,
|(e, &SparseStored(i))| results.lock().unwrap().push((e, i)),
);
world
.query::<(Entity, &SparseStored)>()
.par_iter(&world)
.for_each(|(e, &SparseStored(i))| results.lock().unwrap().push((e, i)));
results.lock().unwrap().sort();
assert_eq!(
&*results.lock().unwrap(),
Expand Down
2 changes: 2 additions & 0 deletions crates/bevy_ecs/src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ mod access;
mod fetch;
mod filter;
mod iter;
mod par_iter;
mod state;

pub use access::*;
pub use fetch::*;
pub use filter::*;
pub use iter::*;
pub use par_iter::*;
pub use state::*;

/// A debug checked version of [`Option::unwrap_unchecked`]. Will panic in
Expand Down
202 changes: 202 additions & 0 deletions crates/bevy_ecs/src/query/par_iter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
use crate::world::World;
use bevy_tasks::ComputeTaskPool;
use std::ops::Range;

use super::{QueryItem, QueryState, ROQueryItem, ReadOnlyWorldQuery, WorldQuery};

/// Dictates how a parallel query chunks up large tables/archetypes
/// during iteration.
///
/// A parallel query will chunk up large tables and archetypes into
/// chunks of at most a certain batch size.
///
/// By default, this batch size is automatically determined by dividing
/// the size of the largest matched archetype by the number
/// of threads. This attempts to minimize the overhead of scheduling
/// tasks onto multiple threads, but assumes each entity has roughly the
/// same amount of work to be done, which may not hold true in every
/// workload.
///
/// See [`Query::par_iter`] for more information.
///
/// [`Query::par_iter`]: crate::system::Query::par_iter
#[derive(Clone)]
pub struct BatchingStrategy {
/// The upper and lower limits for how large a batch of entities.
///
/// Setting the bounds to the same value will result in a fixed
/// batch size.
///
/// Defaults to `[1, usize::MAX]`.
pub batch_size_limits: Range<usize>,
james7132 marked this conversation as resolved.
Show resolved Hide resolved
/// The number of batches per thread in the [`ComputeTaskPool`].
/// Increasing this value will decrease the batch size, which may
/// increase the scheduling overhead for the iteration.
///
/// Defaults to 1.
pub batches_per_thread: usize,
}

impl BatchingStrategy {
/// Creates a new unconstrained default batching strategy.
pub const fn new() -> Self {
Self {
batch_size_limits: 1..usize::MAX,
batches_per_thread: 1,
}
}

/// Declares a batching strategy with a fixed batch size.
pub const fn fixed(batch_size: usize) -> Self {
Self {
batch_size_limits: batch_size..batch_size,
batches_per_thread: 1,
}
}

pub const fn min_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size_limits.start = batch_size;
self
}

pub const fn max_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size_limits.end = batch_size;
self
}

pub fn batches_per_thread(mut self, batches_per_thread: usize) -> Self {
assert!(
batches_per_thread > 0,
"The number of batches per thread must be non-zero."
);
self.batches_per_thread = batches_per_thread;
self
}
}

/// A parallel iterator over query results of a [`Query`](crate::system::Query).
///
/// This struct is created by the [`Query::par_iter`](crate::system::Query::iter) and
/// [`Query::par_iter_mut`](crate::system::Query::iter_mut) methods.
pub struct QueryParIter<'w, 's, Q: WorldQuery, F: ReadOnlyWorldQuery> {
pub(crate) world: &'w World,
pub(crate) state: &'s QueryState<Q, F>,
pub(crate) batching_strategy: BatchingStrategy,
}

impl<'w, 's, Q: ReadOnlyWorldQuery, F: ReadOnlyWorldQuery> QueryParIter<'w, 's, Q, F> {
/// Runs `func` on each query result in parallel.
///
/// This can only be called for read-only queries, see [`Self::for_each_mut`] for
/// write-queries.
///
/// # Panics
/// The [`ComputeTaskPool`] is not initialized. If using this from a query that is being
/// initialized and run from the ECS scheduler, this should never panic.
///
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
#[inline]
pub fn for_each<FN: Fn(ROQueryItem<'w, Q>) + Send + Sync + Clone>(&self, func: FN) {
// SAFETY: query is read only
unsafe {
self.for_each_unchecked(func);
}
}
}

impl<'w, 's, Q: WorldQuery, F: ReadOnlyWorldQuery> QueryParIter<'w, 's, Q, F> {
/// Changes the batching strategy used when iterating.
///
/// For more information on how this affects the resultant iteration, see
/// [`BatchingStrategy`].
pub fn batching_strategy(mut self, strategy: BatchingStrategy) -> Self {
self.batching_strategy = strategy;
self
}

/// Runs `func` on each query result in parallel.
///
/// # Panics
/// The [`ComputeTaskPool`] is not initialized. If using this from a query that is being
/// initialized and run from the ECS scheduler, this should never panic.
///
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
#[inline]
pub fn for_each_mut<FN: Fn(QueryItem<'w, Q>) + Send + Sync + Clone>(&mut self, func: FN) {
// SAFETY: query has unique world access
unsafe {
self.for_each_unchecked(func);
}
}

/// Runs `func` on each query result in parallel.
///
/// # Panics
/// The [`ComputeTaskPool`] is not initialized. If using this from a query that is being
/// initialized and run from the ECS scheduler, this should never panic.
///
/// # Safety
///
/// This does not check for mutable query correctness. To be safe, make sure mutable queries
/// have unique access to the components they query.
///
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
#[inline]
pub unsafe fn for_each_unchecked<FN: Fn(QueryItem<'w, Q>) + Send + Sync + Clone>(
&self,
func: FN,
) {
let thread_count = ComputeTaskPool::get().thread_num();
if thread_count <= 1 {
self.state.for_each_unchecked_manual(
self.world,
func,
self.world.last_change_tick(),
self.world.read_change_tick(),
);
} else {
// Need a batch size of at least 1.
let batch_size = self.get_batch_size(thread_count).max(1);
self.state.par_for_each_unchecked_manual(
self.world,
batch_size,
func,
self.world.last_change_tick(),
self.world.read_change_tick(),
);
}
}

fn get_batch_size(&self, thread_count: usize) -> usize {
if self.batching_strategy.batch_size_limits.is_empty() {
return self.batching_strategy.batch_size_limits.start;
}

assert!(
thread_count > 0,
"Attempted to run parallel iteration over a query with an empty TaskPool"
);
let max_size = if Q::IS_DENSE && F::IS_DENSE {
let tables = &self.world.storages().tables;
self.state
.matched_table_ids
.iter()
.map(|id| tables[*id].entity_count())
.max()
.unwrap_or(0)
} else {
let archetypes = &self.world.archetypes();
self.state
.matched_archetype_ids
.iter()
.map(|id| archetypes[*id].len())
.max()
.unwrap_or(0)
};
let batch_size = max_size / (thread_count * self.batching_strategy.batches_per_thread);
batch_size.clamp(
self.batching_strategy.batch_size_limits.start,
self.batching_strategy.batch_size_limits.end,
)
}
}
Loading