Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EMA based statistically adaptive thread pool design #108

Closed
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions benches/blocking.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#![feature(test)]

extern crate test;

use async_std::task;
use async_std::task::blocking::JoinHandle;
use futures::future::join_all;
use std::thread;
use std::time::Duration;
use test::Bencher;

// Benchmark for a 10K burst task spawn
#[bench]
fn blocking(b: &mut Bencher) {
b.iter(|| {
let handles = (0..10_000)
vertexclique marked this conversation as resolved.
Show resolved Hide resolved
.map(|_| {
task::blocking::spawn(async {
let duration = Duration::from_millis(1);
thread::sleep(duration);
})
})
.collect::<Vec<JoinHandle<()>>>();

task::block_on(join_all(handles));
});
}

// Benchmark for a single blocking task spawn
#[bench]
fn blocking_single(b: &mut Bencher) {
b.iter(|| {
task::blocking::spawn(async {
let duration = Duration::from_millis(1);
thread::sleep(duration);
})
});
}
238 changes: 215 additions & 23 deletions src/task/blocking.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,32 @@
//! A thread pool for running blocking functions asynchronously.
//!
//! Blocking thread pool consists of four elements:
//! * Frequency Detector
//! * Trend Estimator
//! * Predictive Upscaler
//! * Time-based Downscaler
//!
//! ## Frequency Detector
//! Detects how many tasks are submitted from scheduler to thread pool in a given time frame.
//! Pool manager thread does this sampling every 200 milliseconds.
//! This value is going to be used for trend estimation phase.
//!
//! ## Trend Estimator
//! Hold up to the given number of frequencies to create an estimation.
//! This pool holds 10 frequencies at a time.
vertexclique marked this conversation as resolved.
Show resolved Hide resolved
//! Estimation algorithm and prediction uses Exponentially Weighted Moving Average algorithm.
//!
//! Algorithm is altered and adapted from [A Novel Predictive and Self–Adaptive Dynamic Thread Pool Management](https://doi.org/10.1109/ISPA.2011.61).
vertexclique marked this conversation as resolved.
Show resolved Hide resolved
//!
//! ## Predictive Upscaler
//! Selects upscaling amount based on estimation or when throughput hogs based on amount of tasks mapped.
//! Throughput hogs determined by a combination of job in / job out frequency and current scheduler task assignment frequency.
vertexclique marked this conversation as resolved.
Show resolved Hide resolved
//!
//! ## Time-based Downscaler
//! After dynamic tasks spawned with upscaler they will continue working in between 1 second and 10 seconds.
//! When tasks are detached from the channels after this amount they join back.
vertexclique marked this conversation as resolved.
Show resolved Hide resolved

use std::collections::VecDeque;
use std::fmt;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
Expand All @@ -10,21 +37,44 @@ use crossbeam_channel::{bounded, Receiver, Sender};
use lazy_static::lazy_static;

use crate::future::Future;
use crate::io::ErrorKind;
use crate::task::{Context, Poll};
use crate::utils::abort_on_panic;
use std::sync::{Arc, Mutex};
vertexclique marked this conversation as resolved.
Show resolved Hide resolved

const MAX_THREADS: u64 = 10_000;
/// Low watermark value, defines the bare minimum of the pool.
/// Spawns initial thread set.
const LOW_WATERMARK: u64 = 2;

static DYNAMIC_THREAD_COUNT: AtomicU64 = AtomicU64::new(0);
/// Pool managers interval time (milliseconds).
/// This is the actual interval which makes adaptation calculation.
const MANAGER_POLL_INTERVAL: u64 = 200;

/// Frequency histogram's sliding window size.
/// Defines how many frequencies will be considered for adaptation.
const FREQUENCY_QUEUE_SIZE: usize = 10;

/// Exponential moving average smoothing coefficient for limited window.
/// Smoothing factor is estimated with: 2 / (N + 1) where N is sample size.
const EMA_COEFFICIENT: f64 = 2_f64 / (FREQUENCY_QUEUE_SIZE as f64 + 1_f64);

/// Pool task frequency variable.
/// Holds scheduled tasks onto the thread pool for the calculation window.
static FREQUENCY: AtomicU64 = AtomicU64::new(0);

/// Possible max threads (without OS contract).
static MAX_THREADS: AtomicU64 = AtomicU64::new(10_000);

/// Pool interface between the scheduler and thread pool
struct Pool {
sender: Sender<async_task::Task<()>>,
receiver: Receiver<async_task::Task<()>>,
}

lazy_static! {
/// Blocking pool with static starting thread count.
static ref POOL: Pool = {
for _ in 0..2 {
for _ in 0..LOW_WATERMARK {
thread::Builder::new()
.name("async-blocking-driver".to_string())
.spawn(|| abort_on_panic(|| {
Expand All @@ -35,6 +85,19 @@ lazy_static! {
.expect("cannot start a thread driving blocking tasks");
}

// Pool manager to check frequency of task rates
// and take action by scaling the pool accordingly.
thread::Builder::new()
.name("async-pool-manager".to_string())
.spawn(|| abort_on_panic(|| {
let poll_interval = Duration::from_millis(MANAGER_POLL_INTERVAL);
loop {
scale_pool();
thread::sleep(poll_interval);
}
}))
.expect("thread pool manager cannot be started");

// We want to use an unbuffered channel here to help
// us drive our dynamic control. In effect, the
// kernel's scheduler becomes the queue, reducing
Expand All @@ -45,20 +108,125 @@ lazy_static! {
let (sender, receiver) = bounded(0);
Pool { sender, receiver }
};

/// Sliding window for pool task frequency calculation
static ref FREQ_QUEUE: Arc<Mutex<VecDeque<u64>>> = {
Arc::new(Mutex::new(VecDeque::with_capacity(FREQUENCY_QUEUE_SIZE + 1)))
};

/// Dynamic pool thread count variable
static ref POOL_SIZE: Arc<Mutex<u64>> = Arc::new(Mutex::new(LOW_WATERMARK));
vertexclique marked this conversation as resolved.
Show resolved Hide resolved
}

// Create up to MAX_THREADS dynamic blocking task worker threads.
// Dynamic threads will terminate themselves if they don't
// receive any work after between one and ten seconds.
fn maybe_create_another_blocking_thread() {
// We use a `Relaxed` atomic operation because
// it's just a heuristic, and would not lose correctness
// even if it's random.
let workers = DYNAMIC_THREAD_COUNT.load(Ordering::Relaxed);
if workers >= MAX_THREADS {
return;
/// Exponentially Weighted Moving Average calculation
///
/// This allows us to find the EMA value.
/// This value represents the trend of tasks mapped onto the thread pool.
/// Calculation is following:
/// ```
/// +--------+-----------------+----------------------------------+
/// | Symbol | Identifier | Explanation |
/// +--------+-----------------+----------------------------------+
/// | α | EMA_COEFFICIENT | smoothing factor between 0 and 1 |
/// | Yt | freq | frequency sample at time t |
/// | St | acc | EMA at time t |
/// +--------+-----------------+----------------------------------+
/// ```
/// Under these definitions formula is following:
/// ```
/// EMA = α * [ Yt + (1 - α)*Yt-1 + ((1 - α)^2)*Yt-2 + ((1 - α)^3)*Yt-3 ... ] + St
/// ```
/// # Arguments
///
/// * `freq_queue` - Sliding window of frequency samples
#[inline]
Copy link
Contributor

@spacejam spacejam Sep 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in general, humans are bad at guessing whether something should be inlined. Usually in rust it's better to leave these attributes out unless some code is going to be executed in a super hot path and you've done work to ensure the improvement is measurable

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is not contributing to the overall discussion + review. I’ve found it quite a bit out of context and also from the book. I've made the benchmarks already with and without and decided to leave it inlined. You can see below my last run again after your comment (run shouldn't be needed because of how compiler inlines and unrolls):

Mean of 10 consecutive runs of blocking benchmark:

43057207.9 ns without inlining
41816626.1 ns with inlining

That is also better for users of this library indirectly.

Copy link
Contributor

@spacejam spacejam Sep 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit skeptical that change is related to the inline here because this is a function that gets called 5 times per second, and that measurement skew is ~1ms. For a workload that lasts ~40ms, it's not clear to me how measuring things that happen on the order of once every 200ms is relevant.

I'm asking you to simplify your proposed code because we are going to have to keep it working over time, and it's important not to add bits that add complexity for humans without clear benefits.

fn calculate_ema(freq_queue: &VecDeque<u64>) -> f64 {
freq_queue.iter().enumerate().fold(0_f64, |acc, (i, freq)| {
acc + ((*freq as f64) * ((1_f64 - EMA_COEFFICIENT).powf(i as f64) as f64))
}) * EMA_COEFFICIENT as f64
}

/// Adaptive pool scaling function
///
/// This allows to spawn new threads to make room for incoming task pressure.
/// Works in the background detached from the pool system and scales up the pool based
/// on the request rate.
///
/// It uses frequency based calculation to define work. Utilizing average processing rate.
fn scale_pool() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In another context there was some discussion around adding tracing (using log's trace! macro) to async-std. This functions seems like a good place for that, too.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log::trace or the new tracing library? wondering about how much overhead each would add

Copy link
Contributor

@killercup killercup Aug 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just know that the idea was thrown around, and it was probably an opt-in feature, too. @stjepang or @yoshuawuyts would know more :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log now has kv support, which we're already using in other places too. tracing can pick these up too, so using log is probably the best choice here overall (:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should get the kv support in tracing-log soon-ish? However I don't believe that log supports the span-like macros, so async-std might need to have its own, internal span-like macro and dispatch conditionally to async-log or tracing.

I haven't considered how the differences between async-log's span and tracing's span can be bridged, but I think it's feasible if tracing's span!().enter() is used to mimic async-log's nested span block.

Copy link
Contributor

@yoshuawuyts yoshuawuyts Sep 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davidbarsky we could use async-log::span for this, probably. Though there's still a few open issues on the repo, the interface wouldn't change.

From convos with Eliza during RustConf there's def a desire to bridge between the two approaches, and we mostly need to figure out how to go about it. async-rs/async-log#7 is what I've currently got (repo incoming soon), but need to check how well that works (:

Copy link

@davidbarsky davidbarsky Sep 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yoshuawuyts Yep! I think we're on the same page about wanting to bridge the two libraries.

I'm spitballing one mechanism on bridging that gap, which consists of using mutually exclusive feature flags in a build.rs to dispatch to either async-log or tracing. This would entail using an async-std-specific span macro until the gaps—if any exist—between async-std's span and tracing's span are closed. I'm personally excited to dig into rust-lang/log#353!

I'm sorry if what I communicated wasn't clear!

// Fetch current frequency, it does matter that operations are ordered in this approach.
let current_frequency = FREQUENCY.load(Ordering::SeqCst);
vertexclique marked this conversation as resolved.
Show resolved Hide resolved
let freq_queue_arc = FREQ_QUEUE.clone();
vertexclique marked this conversation as resolved.
Show resolved Hide resolved
let mut freq_queue = freq_queue_arc.lock().unwrap();

// Make it safe to start for calculations by adding initial frequency scale
if freq_queue.len() == 0 {
freq_queue.push_back(0);
}

// Calculate message rate for the given time window
let frequency = (current_frequency as f64 / MANAGER_POLL_INTERVAL as f64) as u64;

// Calculates current time window's EMA value (including last sample)
let prev_ema_frequency = calculate_ema(&freq_queue);

// Add seen frequency data to the frequency histogram.
freq_queue.push_back(frequency);
if freq_queue.len() == FREQUENCY_QUEUE_SIZE + 1 {
freq_queue.pop_front();
}

// Calculates current time window's EMA value (including last sample)
let curr_ema_frequency = calculate_ema(&freq_queue);

// Adapts the thread count of pool
//
// Sliding window of frequencies visited by the pool manager.
// Pool manager creates EMA value for previous window and current window.
// Compare them to determine scaling amount based on the trends.
// If current EMA value is bigger, we will scale up.
if curr_ema_frequency > prev_ema_frequency {
// "Scale by" amount can be seen as "how much load is coming".
// "Scale" amount is "how many threads we should spawn".
let scale_by: f64 = curr_ema_frequency - prev_ema_frequency;
let scale = ((LOW_WATERMARK as f64 * scale_by) + LOW_WATERMARK as f64) as u64;

// It is time to scale the pool!
(0..scale).for_each(|_| {
create_blocking_thread();
});
} else if (curr_ema_frequency - prev_ema_frequency).abs() < std::f64::EPSILON
&& current_frequency != 0
{
// Throughput is low. Allocate more threads to unblock flow.
// If we fall to this case, scheduler is congested by longhauling tasks.
// For unblock the flow we should add up some threads to the pool, but not that many to
// stagger the program's operation.
let scale = LOW_WATERMARK * current_frequency + 1;

// Scale it up!
(0..scale).for_each(|_| {
create_blocking_thread();
});
}

FREQUENCY.store(0, Ordering::Release);
}

/// Creates blocking thread to receive tasks
/// Dynamic threads will terminate themselves if they don't
/// receive any work after between one and ten seconds.
fn create_blocking_thread() {
// Check that thread is spawnable.
// If it hits to the OS limits don't spawn it.
{
let current_arc = POOL_SIZE.clone();
vertexclique marked this conversation as resolved.
Show resolved Hide resolved
let pool_size = *current_arc.lock().unwrap();
if pool_size >= MAX_THREADS.load(Ordering::SeqCst) {
MAX_THREADS.store(10_000, Ordering::SeqCst);
return;
}
}
// We want to avoid having all threads terminate at
// exactly the same time, causing thundering herd
// effects. We want to stagger their destruction over
Expand All @@ -68,29 +236,53 @@ fn maybe_create_another_blocking_thread() {
// Generate a simple random number of milliseconds
let rand_sleep_ms = u64::from(random(10_000));

thread::Builder::new()
let _ = thread::Builder::new()
.name("async-blocking-driver-dynamic".to_string())
.spawn(move || {
let wait_limit = Duration::from_millis(1000 + rand_sleep_ms);

DYNAMIC_THREAD_COUNT.fetch_add(1, Ordering::Relaxed);
// Adjust the pool size counter before and after spawn
{
let current_arc = POOL_SIZE.clone();
*current_arc.lock().unwrap() += 1;
}
vertexclique marked this conversation as resolved.
Show resolved Hide resolved
while let Ok(task) = POOL.receiver.recv_timeout(wait_limit) {
abort_on_panic(|| task.run());
}
DYNAMIC_THREAD_COUNT.fetch_sub(1, Ordering::Relaxed);
{
let current_arc = POOL_SIZE.clone();
*current_arc.lock().unwrap() -= 1;
}
vertexclique marked this conversation as resolved.
Show resolved Hide resolved
})
.expect("cannot start a dynamic thread driving blocking tasks");
.map_err(|err| {
match err.kind() {
ErrorKind::WouldBlock => {
// Maximum allowed threads per process is varying from system to system.
// Also, some systems have it(like macOS), and some don't(Linux).
// This case expected not to happen.
// But when happened this shouldn't throw a panic.
let current_arc = POOL_SIZE.clone();
MAX_THREADS.store(*current_arc.lock().unwrap() - 1, Ordering::SeqCst);
vertexclique marked this conversation as resolved.
Show resolved Hide resolved
}
_ => eprintln!(
"cannot start a dynamic thread driving blocking tasks: {}",
err
),
}
});
}

// Enqueues work, attempting to send to the threadpool in a
// nonblocking way and spinning up another worker thread if
// there is not a thread ready to accept the work.
/// Enqueues work, attempting to send to the thread pool in a
/// nonblocking way and spinning up needed amount of threads
/// based on the previous statistics without relying on
/// if there is not a thread ready to accept the work or not.
fn schedule(t: async_task::Task<()>) {
// Add up for every incoming scheduled task
FREQUENCY.fetch_add(1, Ordering::Acquire);

if let Err(err) = POOL.sender.try_send(t) {
// We were not able to send to the channel without
// blocking. Try to spin up another thread and then
// retry sending while blocking.
maybe_create_another_blocking_thread();
// blocking.
POOL.sender.send(err.into_inner()).unwrap();
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ mod pool;
mod sleep;
mod task;

pub(crate) mod blocking;
pub mod blocking;
Loading