Skip to content

Commit

Permalink
rt: reduce code defined in macros (#5773)
Browse files Browse the repository at this point in the history
Instead of defining code in macros, move code definition to sub modules
and use the cfg_macro to declare the module.
  • Loading branch information
carllerche authored Jun 7, 2023
1 parent cbb3c15 commit 1c8d22c
Show file tree
Hide file tree
Showing 10 changed files with 262 additions and 238 deletions.
75 changes: 8 additions & 67 deletions tokio/src/runtime/scheduler/multi_thread/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ use crate::util::RngSeedGenerator;

use std::fmt;

cfg_metrics! {
mod metrics;
}

cfg_taskdump! {
mod taskdump;
}

/// Handle to the multi thread scheduler
pub(crate) struct Handle {
/// Task spawner
Expand Down Expand Up @@ -53,73 +61,6 @@ impl Handle {
}
}

cfg_metrics! {
use crate::runtime::{SchedulerMetrics, WorkerMetrics};

impl Handle {
pub(crate) fn num_workers(&self) -> usize {
self.shared.worker_metrics.len()
}

pub(crate) fn num_blocking_threads(&self) -> usize {
self.blocking_spawner.num_threads()
}

pub(crate) fn num_idle_blocking_threads(&self) -> usize {
self.blocking_spawner.num_idle_threads()
}

pub(crate) fn active_tasks_count(&self) -> usize {
self.shared.owned.active_tasks_count()
}

pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
&self.shared.scheduler_metrics
}

pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
&self.shared.worker_metrics[worker]
}

pub(crate) fn injection_queue_depth(&self) -> usize {
self.shared.injection_queue_depth()
}

pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
self.shared.worker_local_queue_depth(worker)
}

pub(crate) fn blocking_queue_depth(&self) -> usize {
self.blocking_spawner.queue_depth()
}
}
}

cfg_taskdump! {
impl Handle {
pub(crate) async fn dump(&self) -> crate::runtime::Dump {
let trace_status = &self.shared.trace_status;

// If a dump is in progress, block.
trace_status.start_trace_request(&self).await;

let result = loop {
if let Some(result) = trace_status.take_result() {
break result;
} else {
self.notify_all();
trace_status.result_ready.notified().await;
}
};

// Allow other queued dumps to proceed.
trace_status.end_trace_request(&self).await;

result
}
}
}

impl fmt::Debug for Handle {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("multi_thread::Handle { ... }").finish()
Expand Down
41 changes: 41 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use super::Handle;

use crate::runtime::{SchedulerMetrics, WorkerMetrics};

impl Handle {
pub(crate) fn num_workers(&self) -> usize {
self.shared.worker_metrics.len()
}

pub(crate) fn num_blocking_threads(&self) -> usize {
self.blocking_spawner.num_threads()
}

pub(crate) fn num_idle_blocking_threads(&self) -> usize {
self.blocking_spawner.num_idle_threads()
}

pub(crate) fn active_tasks_count(&self) -> usize {
self.shared.owned.active_tasks_count()
}

pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
&self.shared.scheduler_metrics
}

pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
&self.shared.worker_metrics[worker]
}

pub(crate) fn injection_queue_depth(&self) -> usize {
self.shared.injection_queue_depth()
}

pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
self.shared.worker_local_queue_depth(worker)
}

pub(crate) fn blocking_queue_depth(&self) -> usize {
self.blocking_spawner.queue_depth()
}
}
26 changes: 26 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread/handle/taskdump.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use super::Handle;

use crate::runtime::Dump;

impl Handle {
pub(crate) async fn dump(&self) -> Dump {
let trace_status = &self.shared.trace_status;

// If a dump is in progress, block.
trace_status.start_trace_request(&self).await;

let result = loop {
if let Some(result) = trace_status.take_result() {
break result;
} else {
self.notify_all();
trace_status.result_ready.notified().await;
}
};

// Allow other queued dumps to proceed.
trace_status.end_trace_request(&self).await;

result
}
}
8 changes: 8 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,17 @@ mod worker;
pub(crate) use worker::{Context, Launch, Shared};

cfg_taskdump! {
mod trace;
use trace::TraceStatus;

pub(crate) use worker::Synced;
}

cfg_not_taskdump! {
mod trace_mock;
use trace_mock::TraceStatus;
}

pub(crate) use worker::block_in_place;

use crate::loom::sync::Arc;
Expand Down
61 changes: 61 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread/trace.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use crate::loom::sync::atomic::{AtomicBool, Ordering};
use crate::loom::sync::{Barrier, Mutex};
use crate::runtime::dump::Dump;
use crate::runtime::scheduler::multi_thread::Handle;
use crate::sync::notify::Notify;

/// Tracing status of the worker.
pub(super) struct TraceStatus {
pub(super) trace_requested: AtomicBool,
pub(super) trace_start: Barrier,
pub(super) trace_end: Barrier,
pub(super) result_ready: Notify,
pub(super) trace_result: Mutex<Option<Dump>>,
}

impl TraceStatus {
pub(super) fn new(remotes_len: usize) -> Self {
Self {
trace_requested: AtomicBool::new(false),
trace_start: Barrier::new(remotes_len),
trace_end: Barrier::new(remotes_len),
result_ready: Notify::new(),
trace_result: Mutex::new(None),
}
}

pub(super) fn trace_requested(&self) -> bool {
self.trace_requested.load(Ordering::Relaxed)
}

pub(super) async fn start_trace_request(&self, handle: &Handle) {
while self
.trace_requested
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
.is_err()
{
handle.notify_all();
crate::task::yield_now().await;
}
}

pub(super) fn stash_result(&self, dump: Dump) {
let _ = self.trace_result.lock().insert(dump);
self.result_ready.notify_one();
}

pub(super) fn take_result(&self) -> Option<Dump> {
self.trace_result.lock().take()
}

pub(super) async fn end_trace_request(&self, handle: &Handle) {
while self
.trace_requested
.compare_exchange(true, false, Ordering::Acquire, Ordering::Relaxed)
.is_err()
{
handle.notify_all();
crate::task::yield_now().await;
}
}
}
11 changes: 11 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread/trace_mock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
pub(super) struct TraceStatus {}

impl TraceStatus {
pub(super) fn new(_: usize) -> Self {
Self {}
}

pub(super) fn trace_requested(&self) -> bool {
false
}
}
Loading

0 comments on commit 1c8d22c

Please sign in to comment.