Skip to content

Commit

Permalink
rt(alt): track which workers are idle. (#5886)
Browse files Browse the repository at this point in the history
The scheduler uses this map to avoid trying to steal from idle workers.
  • Loading branch information
carllerche committed Jul 21, 2023
1 parent 4165601 commit a58beb3
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 27 deletions.
43 changes: 18 additions & 25 deletions tokio/src/runtime/scheduler/multi_thread_alt/idle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub(super) struct Idle {
num_idle: AtomicUsize,

/// Map of idle cores
// idle_map: IdleMap,
idle_map: IdleMap,

/// Used to catch false-negatives when waking workers
needs_searching: AtomicBool,
Expand All @@ -30,7 +30,7 @@ pub(super) struct IdleMap {
}

pub(super) struct Snapshot {
// chunks: Vec<usize>,
chunks: Vec<usize>,
}

/// Data synchronized by the scheduler mutex
Expand All @@ -47,7 +47,7 @@ impl Idle {
let idle = Idle {
num_searching: AtomicUsize::new(0),
num_idle: AtomicUsize::new(cores.len()),
// idle_map: IdleMap::new(&cores),
idle_map: IdleMap::new(&cores),
needs_searching: AtomicBool::new(false),
num_cores: cores.len(),
};
Expand All @@ -69,22 +69,22 @@ impl Idle {
self.num_searching.load(Acquire)
}

pub(super) fn snapshot(&self, _snapshot: &mut Snapshot) {
// snapshot.update(&self.idle_map)
pub(super) fn snapshot(&self, snapshot: &mut Snapshot) {
snapshot.update(&self.idle_map)
}

/// Try to acquire an available core
pub(super) fn try_acquire_available_core(&self, synced: &mut Synced) -> Option<Box<Core>> {
let ret = synced.available_cores.pop();

if let Some(_core) = &ret {
if let Some(core) = &ret {
// Decrement the number of idle cores
let num_idle = self.num_idle.load(Acquire) - 1;
debug_assert_eq!(num_idle, synced.available_cores.len());
self.num_idle.store(num_idle, Release);

// self.idle_map.unset(core.index);
// debug_assert!(self.idle_map.matches(&synced.available_cores));
self.idle_map.unset(core.index);
debug_assert!(self.idle_map.matches(&synced.available_cores));
}

ret
Expand Down Expand Up @@ -151,8 +151,8 @@ impl Idle {
debug_assert!(!core.is_searching);
core.is_searching = true;

// self.idle_map.unset(core.index);
// debug_assert!(self.idle_map.matches(&synced.idle.available_cores));
self.idle_map.unset(core.index);
debug_assert!(self.idle_map.matches(&synced.idle.available_cores));

// Assign the core to the worker
synced.assigned_cores[worker] = Some(core);
Expand Down Expand Up @@ -201,7 +201,7 @@ impl Idle {
if let Some(core) = synced.idle.available_cores.pop() {
debug_assert!(!core.is_searching);

// self.idle_map.unset(core.index);
self.idle_map.unset(core.index);

synced.assigned_cores[worker] = Some(core);

Expand All @@ -217,7 +217,7 @@ impl Idle {
}

if !workers.is_empty() {
// debug_assert!(self.idle_map.matches(&synced.idle.available_cores));
debug_assert!(self.idle_map.matches(&synced.idle.available_cores));
let num_idle = synced.idle.available_cores.len();
self.num_idle.store(num_idle, Release);
} else {
Expand All @@ -237,7 +237,7 @@ impl Idle {
let worker = synced.idle.sleepers.pop().unwrap();
let core = synced.idle.available_cores.pop().unwrap();

// self.idle_map.unset(core.index);
self.idle_map.unset(core.index);

synced.assigned_cores[worker] = Some(core);
shared.condvars[worker].notify_one();
Expand All @@ -246,7 +246,7 @@ impl Idle {
.store(synced.idle.available_cores.len(), Release);
}

// debug_assert!(self.idle_map.matches(&synced.idle.available_cores));
debug_assert!(self.idle_map.matches(&synced.idle.available_cores));

// Wake up any other workers
while let Some(index) = synced.idle.sleepers.pop() {
Expand All @@ -267,12 +267,12 @@ impl Idle {
let num_idle = synced.idle.available_cores.len();
debug_assert_eq!(num_idle, self.num_idle.load(Acquire));

// self.idle_map.set(core.index);
self.idle_map.set(core.index);

// Store the core in the list of available cores
synced.idle.available_cores.push(core);

// debug_assert!(self.idle_map.matches(&synced.idle.available_cores));
debug_assert!(self.idle_map.matches(&synced.idle.available_cores));

// Update `num_idle`
self.num_idle.store(num_idle + 1, Release);
Expand Down Expand Up @@ -386,25 +386,19 @@ impl IdleMap {
}

impl Snapshot {
pub(crate) fn new(_idle: &Idle) -> Snapshot {
/*
pub(crate) fn new(idle: &Idle) -> Snapshot {
let chunks = vec![0; idle.idle_map.chunks.len()];
let mut ret = Snapshot { chunks };
ret.update(&idle.idle_map);
ret
*/
Snapshot {}
}

fn update(&mut self, _idle_map: &IdleMap) {
/*
fn update(&mut self, idle_map: &IdleMap) {
for i in 0..self.chunks.len() {
self.chunks[i] = idle_map.chunks[i].load(Acquire);
}
*/
}

/*
pub(super) fn is_idle(&self, index: usize) -> bool {
let (chunk, mask) = index_to_mask(index);
debug_assert!(
Expand All @@ -415,7 +409,6 @@ impl Snapshot {
);
self.chunks[chunk] & mask == mask
}
*/
}

fn num_chunks(max_cores: usize) -> usize {
Expand Down
2 changes: 0 additions & 2 deletions tokio/src/runtime/scheduler/multi_thread_alt/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -865,12 +865,10 @@ impl Worker {
continue;
}

/*
// If the core is currently idle, then there is nothing to steal.
if self.idle_snapshot.is_idle(i) {
continue;
}
*/

let target = &cx.shared().remotes[i];

Expand Down

0 comments on commit a58beb3

Please sign in to comment.