Skip to content

Commit

Permalink
rt: move deferred task list to scheduler (#5741)
Browse files Browse the repository at this point in the history
Previously, the deferred task list (list of tasks that yielded and are
waiting to be woken) was stored on the global runtime context. Because
the scheduler is responsible for waking these tasks, it took additional
TLS reads to perform the wake operation.

Instead, this commit moves the list of deferred tasks into the scheduler
context. This makes it easily accessible from the scheduler itself.
  • Loading branch information
carllerche committed Jun 1, 2023
1 parent a96dab1 commit c748f49
Show file tree
Hide file tree
Showing 11 changed files with 137 additions and 133 deletions.
53 changes: 14 additions & 39 deletions tokio/src/runtime/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ cfg_rt! {
mod scoped;
use scoped::Scoped;

use crate::runtime::{scheduler, task::Id, Defer};
use crate::runtime::{scheduler, task::Id};

use std::cell::RefCell;
use std::marker::PhantomData;
use std::task::Waker;
use std::time::Duration;

cfg_taskdump! {
Expand Down Expand Up @@ -45,11 +46,6 @@ struct Context {
#[cfg(feature = "rt")]
runtime: Cell<EnterRuntime>,

/// Yielded task wakers are stored here and notified after resource drivers
/// are polled.
#[cfg(feature = "rt")]
defer: RefCell<Option<Defer>>,

#[cfg(any(feature = "rt", feature = "macros"))]
rng: FastRand,

Expand Down Expand Up @@ -93,9 +89,6 @@ tokio_thread_local! {
#[cfg(feature = "rt")]
runtime: Cell::new(EnterRuntime::NotEntered),

#[cfg(feature = "rt")]
defer: RefCell::new(None),

#[cfg(any(feature = "rt", feature = "macros"))]
rng: FastRand::new(RngSeed::new()),

Expand Down Expand Up @@ -170,12 +163,6 @@ cfg_rt! {

#[allow(dead_code)] // Only tracking the guard.
pub(crate) handle: SetCurrentGuard,

/// If true, then this is the root runtime guard. It is possible to nest
/// runtime guards by using `block_in_place` between the calls. We need
/// to track the root guard as this is the guard responsible for freeing
/// the deferred task queue.
is_root: bool,
}

/// Guard tracking that a caller has entered a blocking region.
Expand Down Expand Up @@ -240,20 +227,9 @@ cfg_rt! {
// Set the entered flag
c.runtime.set(EnterRuntime::Entered { allow_block_in_place });

// Initialize queue to track yielded tasks
let mut defer = c.defer.borrow_mut();

let is_root = if defer.is_none() {
*defer = Some(Defer::new());
true
} else {
false
};

Some(EnterRuntimeGuard {
blocking: BlockingRegionGuard::new(),
handle: c.set_current(handle),
is_root,
})
}
})
Expand Down Expand Up @@ -292,11 +268,17 @@ cfg_rt! {
DisallowBlockInPlaceGuard(reset)
}

pub(crate) fn with_defer<R>(f: impl FnOnce(&mut Defer) -> R) -> Option<R> {
CONTEXT.with(|c| {
let mut defer = c.defer.borrow_mut();
defer.as_mut().map(f)
})
#[track_caller]
pub(crate) fn defer(waker: &Waker) {
with_scheduler(|maybe_scheduler| {
if let Some(scheduler) = maybe_scheduler {
scheduler.defer(waker);
} else {
// Called from outside of the runtime, immediately wake the
// task.
waker.wake_by_ref();
}
});
}

pub(super) fn set_scheduler<R>(v: &scheduler::Context, f: impl FnOnce() -> R) -> R {
Expand Down Expand Up @@ -342,10 +324,6 @@ cfg_rt! {
CONTEXT.with(|c| {
assert!(c.runtime.get().is_entered());
c.runtime.set(EnterRuntime::NotEntered);

if self.is_root {
*c.defer.borrow_mut() = None;
}
});
}
}
Expand All @@ -354,6 +332,7 @@ cfg_rt! {
fn new() -> BlockingRegionGuard {
BlockingRegionGuard { _p: PhantomData }
}

/// Blocks the thread on the specified future, returning the value with
/// which that future completes.
pub(crate) fn block_on<F>(&mut self, f: F) -> Result<F::Output, AccessError>
Expand Down Expand Up @@ -397,10 +376,6 @@ cfg_rt! {
return Err(());
}

// Wake any yielded tasks before parking in order to avoid
// blocking.
with_defer(|defer| defer.wake());

park.park_timeout(when - now);
}
}
Expand Down
38 changes: 0 additions & 38 deletions tokio/src/runtime/defer.rs

This file was deleted.

3 changes: 0 additions & 3 deletions tokio/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,6 @@ cfg_rt! {
pub use crate::util::rand::RngSeed;
}

mod defer;
pub(crate) use defer::Defer;

cfg_taskdump! {
pub mod dump;
pub use dump::Dump;
Expand Down
5 changes: 0 additions & 5 deletions tokio/src/runtime/park.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,11 +284,6 @@ impl CachedParkThread {
return Ok(v);
}

// Wake any yielded tasks before parking in order to avoid
// blocking.
#[cfg(feature = "rt")]
crate::runtime::context::with_defer(|defer| defer.wake());

self.park();
}
}
Expand Down
50 changes: 26 additions & 24 deletions tokio/src/runtime/scheduler/current_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ use crate::future::poll_fn;
use crate::loom::sync::atomic::AtomicBool;
use crate::loom::sync::Arc;
use crate::runtime::driver::{self, Driver};
use crate::runtime::scheduler::{self, Defer};
use crate::runtime::task::{self, Inject, JoinHandle, OwnedTasks, Schedule, Task};
use crate::runtime::{blocking, context, scheduler, Config};
use crate::runtime::{MetricsBatch, SchedulerMetrics, WorkerMetrics};
use crate::runtime::{blocking, context, Config, MetricsBatch, SchedulerMetrics, WorkerMetrics};
use crate::sync::notify::Notify;
use crate::util::atomic_cell::AtomicCell;
use crate::util::{waker_ref, RngSeedGenerator, Wake, WakerRef};
Expand All @@ -15,6 +15,7 @@ use std::fmt;
use std::future::Future;
use std::sync::atomic::Ordering::{AcqRel, Release};
use std::task::Poll::{Pending, Ready};
use std::task::Waker;
use std::time::Duration;

/// Executes tasks on the current thread
Expand Down Expand Up @@ -98,6 +99,9 @@ pub(crate) struct Context {
/// Scheduler core, enabling the holder of `Context` to execute the
/// scheduler.
core: RefCell<Option<Box<Core>>>,

/// Deferred tasks, usually ones that called `task::yield_now()`.
pub(crate) defer: Defer,
}

type Notified = task::Notified<Arc<Handle>>;
Expand Down Expand Up @@ -201,6 +205,7 @@ impl CurrentThread {
context: scheduler::Context::CurrentThread(Context {
handle: handle.clone(),
core: RefCell::new(Some(core)),
defer: Defer::new(),
}),
scheduler: self,
})
Expand Down Expand Up @@ -320,21 +325,11 @@ impl Core {
}
}

fn did_defer_tasks() -> bool {
context::with_defer(|deferred| !deferred.is_empty()).unwrap()
}

fn wake_deferred_tasks() {
context::with_defer(|deferred| deferred.wake());
}

#[cfg(tokio_taskdump)]
fn wake_deferred_tasks_and_free() {
let wakers = context::with_defer(|deferred| deferred.take_deferred());
if let Some(wakers) = wakers {
for waker in wakers {
waker.wake();
}
fn wake_deferred_tasks_and_free(context: &Context) {
let wakers = context.defer.take_deferred();
for waker in wakers {
waker.wake();
}
}

Expand Down Expand Up @@ -372,7 +367,7 @@ impl Context {

let (c, _) = self.enter(core, || {
driver.park(&handle.driver);
wake_deferred_tasks();
self.defer.wake();
});

core = c;
Expand All @@ -398,7 +393,7 @@ impl Context {

let (mut core, _) = self.enter(core, || {
driver.park_timeout(&handle.driver, Duration::from_millis(0));
wake_deferred_tasks();
self.defer.wake();
});

core.driver = Some(driver);
Expand All @@ -418,6 +413,10 @@ impl Context {
let core = self.core.borrow_mut().take().expect("core missing");
(core, ret)
}

pub(crate) fn defer(&self, waker: &Waker) {
self.defer.defer(waker);
}
}

// ===== impl Handle =====
Expand Down Expand Up @@ -479,12 +478,15 @@ impl Handle {
.into_iter()
.map(dump::Task::new)
.collect();
});

// Taking a taskdump could wakes every task, but we probably don't want
// the `yield_now` vector to be that large under normal circumstances.
// Therefore, we free its allocation.
wake_deferred_tasks_and_free();
// Avoid double borrow panic
drop(maybe_core);

// Taking a taskdump could wakes every task, but we probably don't want
// the `yield_now` vector to be that large under normal circumstances.
// Therefore, we free its allocation.
wake_deferred_tasks_and_free(context);
});

dump::Dump::new(traces)
}
Expand Down Expand Up @@ -671,7 +673,7 @@ impl CoreGuard<'_> {
None => {
core.metrics.end_processing_scheduled_tasks();

core = if did_defer_tasks() {
core = if !context.defer.is_empty() {
context.park_yield(core, handle)
} else {
context.park(core, handle)
Expand Down
43 changes: 43 additions & 0 deletions tokio/src/runtime/scheduler/defer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use std::cell::RefCell;
use std::task::Waker;

pub(crate) struct Defer {
deferred: RefCell<Vec<Waker>>,
}

impl Defer {
pub(crate) fn new() -> Defer {
Defer {
deferred: Default::default(),
}
}

pub(crate) fn defer(&self, waker: &Waker) {
let mut deferred = self.deferred.borrow_mut();

// If the same task adds itself a bunch of times, then only add it once.
if let Some(last) = deferred.last() {
if last.will_wake(waker) {
return;
}
}

deferred.push(waker.clone());
}

pub(crate) fn is_empty(&self) -> bool {
self.deferred.borrow().is_empty()
}

pub(crate) fn wake(&self) {
while let Some(waker) = self.deferred.borrow_mut().pop() {
waker.wake();
}
}

#[cfg(tokio_taskdump)]
pub(crate) fn take_deferred(&self) -> Vec<Waker> {
let mut deferred = self.deferred.borrow_mut();
std::mem::take(&mut *deferred)
}
}
12 changes: 12 additions & 0 deletions tokio/src/runtime/scheduler/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
cfg_rt! {
pub(crate) mod current_thread;
pub(crate) use current_thread::CurrentThread;

mod defer;
use defer::Defer;
}

cfg_rt_multi_thread! {
Expand Down Expand Up @@ -56,6 +59,7 @@ cfg_rt! {
use crate::runtime::context;
use crate::task::JoinHandle;
use crate::util::RngSeedGenerator;
use std::task::Waker;

impl Handle {
#[track_caller]
Expand Down Expand Up @@ -203,6 +207,14 @@ cfg_rt! {
}
}

pub(crate) fn defer(&self, waker: &Waker) {
match self {
Context::CurrentThread(context) => context.defer(waker),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Context::MultiThread(context) => context.defer(waker),
}
}

cfg_rt_multi_thread! {
#[track_caller]
pub(crate) fn expect_multi_thread(&self) -> &multi_thread::Context {
Expand Down
Loading

0 comments on commit c748f49

Please sign in to comment.