From 0e6b30fd6a245b84dac0e94c884e587df027c5c1 Mon Sep 17 00:00:00 2001 From: Mahmut Bulut Date: Sun, 25 Aug 2019 01:34:50 +0200 Subject: [PATCH 01/20] EMA based statistically adaptive blocking thread pool --- benches/blocking.rs | 26 +++++++++++++ src/task/blocking.rs | 87 ++++++++++++++++++++++++++++++++++---------- src/task/mod.rs | 2 +- 3 files changed, 94 insertions(+), 21 deletions(-) create mode 100644 benches/blocking.rs diff --git a/benches/blocking.rs b/benches/blocking.rs new file mode 100644 index 000000000..29e5bc201 --- /dev/null +++ b/benches/blocking.rs @@ -0,0 +1,26 @@ +#![feature(test)] + +extern crate test; + +use async_std::task; +use async_std::task_local; +use test::{black_box, Bencher}; +use std::thread; +use std::time::Duration; +use async_std::task::blocking::JoinHandle; +use futures::future::{join_all}; + + +#[bench] +fn blocking(b: &mut Bencher) { + b.iter(|| { + let handles = (0..10_000).map(|_| { + task::blocking::spawn(async { + let duration = Duration::from_millis(1); + thread::sleep(duration); + }) + }).collect::>>(); + + task::block_on(join_all(handles)); + }); +} diff --git a/src/task/blocking.rs b/src/task/blocking.rs index 41177bc93..aa7876873 100644 --- a/src/task/blocking.rs +++ b/src/task/blocking.rs @@ -13,23 +13,34 @@ use crate::future::Future; use crate::task::{Context, Poll}; use crate::utils::abort_on_panic; +const LOW_WATERMARK: u64 = 2; const MAX_THREADS: u64 = 10_000; -static DYNAMIC_THREAD_COUNT: AtomicU64 = AtomicU64::new(0); +// Pool task frequency calculation variables +static AVR_FREQUENCY: AtomicU64 = AtomicU64::new(0); +static FREQUENCY: AtomicU64 = AtomicU64::new(0); + +// Pool speedup calculation variables +static SPEEDUP: AtomicU64 = AtomicU64::new(0); + +// Pool size variables +static EXPECTED_POOL_SIZE: AtomicU64 = AtomicU64::new(LOW_WATERMARK); +static CURRENT_POOL_SIZE: AtomicU64 = AtomicU64::new(LOW_WATERMARK); struct Pool { sender: Sender>, - receiver: Receiver>, + receiver: Receiver> } lazy_static! { static ref POOL: Pool = { - for _ in 0..2 { + for _ in 0..LOW_WATERMARK { thread::Builder::new() .name("async-blocking-driver".to_string()) .spawn(|| abort_on_panic(|| { for task in &POOL.receiver { task.run(); + calculate_dispatch_frequency(); } })) .expect("cannot start a thread driving blocking tasks"); @@ -47,18 +58,30 @@ lazy_static! { }; } -// Create up to MAX_THREADS dynamic blocking task worker threads. -// Dynamic threads will terminate themselves if they don't -// receive any work after between one and ten seconds. -fn maybe_create_another_blocking_thread() { - // We use a `Relaxed` atomic operation because - // it's just a heuristic, and would not lose correctness - // even if it's random. - let workers = DYNAMIC_THREAD_COUNT.load(Ordering::Relaxed); - if workers >= MAX_THREADS { - return; +fn calculate_dispatch_frequency() { + // Calculate current message processing rate here + let previous_freq = FREQUENCY.fetch_sub(1, Ordering::Relaxed); + let avr_freq = AVR_FREQUENCY.load(Ordering::Relaxed); + let current_pool_size = CURRENT_POOL_SIZE.load(Ordering::Relaxed); + let frequency = (avr_freq as f64 + previous_freq as f64 / current_pool_size as f64) as u64; + AVR_FREQUENCY.store(frequency, Ordering::Relaxed); + + // Adapt the thread count of pool + let speedup = SPEEDUP.load(Ordering::Relaxed); + if frequency > speedup { + // Speedup can be gained. Scale the pool up here. + SPEEDUP.store(frequency, Ordering::Relaxed); + EXPECTED_POOL_SIZE.store(current_pool_size + 1, Ordering::Relaxed); + } else { + // There is no need for the extra threads, schedule them to be closed. + EXPECTED_POOL_SIZE.fetch_sub(2, Ordering::Relaxed); } +} +// Creates yet another thread to receive tasks. +// Dynamic threads will terminate themselves if they don't +// receive any work after between one and ten seconds. +fn create_blocking_thread() { // We want to avoid having all threads terminate at // exactly the same time, causing thundering herd // effects. We want to stagger their destruction over @@ -73,25 +96,49 @@ fn maybe_create_another_blocking_thread() { .spawn(move || { let wait_limit = Duration::from_millis(1000 + rand_sleep_ms); - DYNAMIC_THREAD_COUNT.fetch_add(1, Ordering::Relaxed); + CURRENT_POOL_SIZE.fetch_add(1, Ordering::Relaxed); while let Ok(task) = POOL.receiver.recv_timeout(wait_limit) { abort_on_panic(|| task.run()); + calculate_dispatch_frequency(); } - DYNAMIC_THREAD_COUNT.fetch_sub(1, Ordering::Relaxed); + CURRENT_POOL_SIZE.fetch_sub(1, Ordering::Relaxed); }) .expect("cannot start a dynamic thread driving blocking tasks"); } // Enqueues work, attempting to send to the threadpool in a -// nonblocking way and spinning up another worker thread if -// there is not a thread ready to accept the work. +// nonblocking way and spinning up needed amount of threads +// based on the previous statistics without relying on +// if there is not a thread ready to accept the work or not. fn schedule(t: async_task::Task<()>) { + // Add up for every incoming task schedule + FREQUENCY.fetch_add(1, Ordering::Relaxed); + + // Calculate the amount of threads needed to spin up + // then retry sending while blocking. It doesn't spin if + // expected pool size is above the MAX_THREADS (which is a + // case won't happen) + let pool_size = EXPECTED_POOL_SIZE.load(Ordering::Relaxed); + let current_pool_size = CURRENT_POOL_SIZE.load(Ordering::Relaxed); + if pool_size > current_pool_size && pool_size <= MAX_THREADS { + let needed = pool_size - current_pool_size; + + // For safety, check boundaries before spawning threads + if needed > 0 && (needed < pool_size || needed < current_pool_size) { + (0..needed).for_each(|_| { + create_blocking_thread(); + }); + } + } + if let Err(err) = POOL.sender.try_send(t) { // We were not able to send to the channel without - // blocking. Try to spin up another thread and then - // retry sending while blocking. - maybe_create_another_blocking_thread(); + // blocking. POOL.sender.send(err.into_inner()).unwrap(); + } else { + // Every successful dispatch, rewarded with negative + let reward = AVR_FREQUENCY.load(Ordering::Relaxed) as f64 / 2.0_f64; + EXPECTED_POOL_SIZE.fetch_sub(reward as u64, Ordering::Relaxed); } } diff --git a/src/task/mod.rs b/src/task/mod.rs index 42b7e0883..e654fdfbd 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -34,4 +34,4 @@ mod pool; mod sleep; mod task; -pub(crate) mod blocking; +pub mod blocking; From 4a1afcd7e0b727cbaf35c43b4d454e96c785a102 Mon Sep 17 00:00:00 2001 From: Mahmut Bulut Date: Sun, 25 Aug 2019 01:57:24 +0200 Subject: [PATCH 02/20] Make cargo fmt happy --- benches/blocking.rs | 20 ++++++++++---------- src/task/blocking.rs | 2 +- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/benches/blocking.rs b/benches/blocking.rs index 29e5bc201..928ff1ade 100644 --- a/benches/blocking.rs +++ b/benches/blocking.rs @@ -3,23 +3,23 @@ extern crate test; use async_std::task; -use async_std::task_local; -use test::{black_box, Bencher}; +use async_std::task::blocking::JoinHandle; +use futures::future::join_all; use std::thread; use std::time::Duration; -use async_std::task::blocking::JoinHandle; -use futures::future::{join_all}; - +use test::Bencher; #[bench] fn blocking(b: &mut Bencher) { b.iter(|| { - let handles = (0..10_000).map(|_| { - task::blocking::spawn(async { - let duration = Duration::from_millis(1); - thread::sleep(duration); + let handles = (0..10_000) + .map(|_| { + task::blocking::spawn(async { + let duration = Duration::from_millis(1); + thread::sleep(duration); + }) }) - }).collect::>>(); + .collect::>>(); task::block_on(join_all(handles)); }); diff --git a/src/task/blocking.rs b/src/task/blocking.rs index aa7876873..ac12c8390 100644 --- a/src/task/blocking.rs +++ b/src/task/blocking.rs @@ -29,7 +29,7 @@ static CURRENT_POOL_SIZE: AtomicU64 = AtomicU64::new(LOW_WATERMARK); struct Pool { sender: Sender>, - receiver: Receiver> + receiver: Receiver>, } lazy_static! { From 7f237b7f1df3295ea937a22fcefd7f08ab4a991b Mon Sep 17 00:00:00 2001 From: Mahmut Bulut Date: Mon, 26 Aug 2019 11:41:16 +0200 Subject: [PATCH 03/20] Use saturating_sub as a guard against low watermark --- src/task/blocking.rs | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/src/task/blocking.rs b/src/task/blocking.rs index ac12c8390..0fe83359e 100644 --- a/src/task/blocking.rs +++ b/src/task/blocking.rs @@ -74,7 +74,11 @@ fn calculate_dispatch_frequency() { EXPECTED_POOL_SIZE.store(current_pool_size + 1, Ordering::Relaxed); } else { // There is no need for the extra threads, schedule them to be closed. - EXPECTED_POOL_SIZE.fetch_sub(2, Ordering::Relaxed); + let expected = EXPECTED_POOL_SIZE.load(Ordering::Relaxed); + if 1 + LOW_WATERMARK < expected { + // Substract amount of low watermark + EXPECTED_POOL_SIZE.fetch_sub(LOW_WATERMARK, Ordering::Relaxed); + } } } @@ -120,10 +124,13 @@ fn schedule(t: async_task::Task<()>) { // case won't happen) let pool_size = EXPECTED_POOL_SIZE.load(Ordering::Relaxed); let current_pool_size = CURRENT_POOL_SIZE.load(Ordering::Relaxed); + let reward = (AVR_FREQUENCY.load(Ordering::Relaxed) as f64 / 2.0_f64) as u64; + if pool_size > current_pool_size && pool_size <= MAX_THREADS { - let needed = pool_size - current_pool_size; + let needed = pool_size.saturating_sub(current_pool_size); - // For safety, check boundaries before spawning threads + // For safety, check boundaries before spawning threads. + // This also won't be expected to happen. But better safe than sorry. if needed > 0 && (needed < pool_size || needed < current_pool_size) { (0..needed).for_each(|_| { create_blocking_thread(); @@ -137,8 +144,9 @@ fn schedule(t: async_task::Task<()>) { POOL.sender.send(err.into_inner()).unwrap(); } else { // Every successful dispatch, rewarded with negative - let reward = AVR_FREQUENCY.load(Ordering::Relaxed) as f64 / 2.0_f64; - EXPECTED_POOL_SIZE.fetch_sub(reward as u64, Ordering::Relaxed); + if reward + LOW_WATERMARK < pool_size { + EXPECTED_POOL_SIZE.fetch_sub(reward, Ordering::Relaxed); + } } } From 55664f1b42b35d6a4c75c8a7a778342e0eafe302 Mon Sep 17 00:00:00 2001 From: Mahmut Bulut Date: Mon, 26 Aug 2019 11:56:03 +0200 Subject: [PATCH 04/20] Make frequency calculation more clear --- src/task/blocking.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/task/blocking.rs b/src/task/blocking.rs index 0fe83359e..56ce153b5 100644 --- a/src/task/blocking.rs +++ b/src/task/blocking.rs @@ -60,10 +60,10 @@ lazy_static! { fn calculate_dispatch_frequency() { // Calculate current message processing rate here - let previous_freq = FREQUENCY.fetch_sub(1, Ordering::Relaxed); + let current_freq = FREQUENCY.fetch_sub(1, Ordering::Relaxed); let avr_freq = AVR_FREQUENCY.load(Ordering::Relaxed); let current_pool_size = CURRENT_POOL_SIZE.load(Ordering::Relaxed); - let frequency = (avr_freq as f64 + previous_freq as f64 / current_pool_size as f64) as u64; + let frequency = (avr_freq as f64 + current_freq as f64 / current_pool_size as f64) as u64; AVR_FREQUENCY.store(frequency, Ordering::Relaxed); // Adapt the thread count of pool From ecfce3a2f22b62e37cebe7cab3befb98a838e4f0 Mon Sep 17 00:00:00 2001 From: Mahmut Bulut Date: Mon, 26 Aug 2019 13:42:55 +0200 Subject: [PATCH 05/20] Use guaranteed delta history for changes in current pool size --- src/task/blocking.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/task/blocking.rs b/src/task/blocking.rs index 56ce153b5..74579fa80 100644 --- a/src/task/blocking.rs +++ b/src/task/blocking.rs @@ -100,12 +100,12 @@ fn create_blocking_thread() { .spawn(move || { let wait_limit = Duration::from_millis(1000 + rand_sleep_ms); - CURRENT_POOL_SIZE.fetch_add(1, Ordering::Relaxed); + CURRENT_POOL_SIZE.fetch_add(1, Ordering::SeqCst); while let Ok(task) = POOL.receiver.recv_timeout(wait_limit) { abort_on_panic(|| task.run()); calculate_dispatch_frequency(); } - CURRENT_POOL_SIZE.fetch_sub(1, Ordering::Relaxed); + CURRENT_POOL_SIZE.fetch_sub(1, Ordering::SeqCst); }) .expect("cannot start a dynamic thread driving blocking tasks"); } @@ -123,7 +123,7 @@ fn schedule(t: async_task::Task<()>) { // expected pool size is above the MAX_THREADS (which is a // case won't happen) let pool_size = EXPECTED_POOL_SIZE.load(Ordering::Relaxed); - let current_pool_size = CURRENT_POOL_SIZE.load(Ordering::Relaxed); + let current_pool_size = CURRENT_POOL_SIZE.load(Ordering::SeqCst); let reward = (AVR_FREQUENCY.load(Ordering::Relaxed) as f64 / 2.0_f64) as u64; if pool_size > current_pool_size && pool_size <= MAX_THREADS { From 1de75ee59496c8a4a99208609a74cf97d56f22eb Mon Sep 17 00:00:00 2001 From: Mahmut Bulut Date: Mon, 26 Aug 2019 14:20:23 +0200 Subject: [PATCH 06/20] Use longer spanning saturated joins during scale down --- src/task/blocking.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/task/blocking.rs b/src/task/blocking.rs index 74579fa80..95c20a687 100644 --- a/src/task/blocking.rs +++ b/src/task/blocking.rs @@ -75,7 +75,7 @@ fn calculate_dispatch_frequency() { } else { // There is no need for the extra threads, schedule them to be closed. let expected = EXPECTED_POOL_SIZE.load(Ordering::Relaxed); - if 1 + LOW_WATERMARK < expected { + if 2 * LOW_WATERMARK < expected { // Substract amount of low watermark EXPECTED_POOL_SIZE.fetch_sub(LOW_WATERMARK, Ordering::Relaxed); } @@ -144,7 +144,7 @@ fn schedule(t: async_task::Task<()>) { POOL.sender.send(err.into_inner()).unwrap(); } else { // Every successful dispatch, rewarded with negative - if reward + LOW_WATERMARK < pool_size { + if reward + (2 * LOW_WATERMARK) < pool_size { EXPECTED_POOL_SIZE.fetch_sub(reward, Ordering::Relaxed); } } From 07eac6854da7df6d11dd44211fee199188976802 Mon Sep 17 00:00:00 2001 From: Mahmut Bulut Date: Mon, 26 Aug 2019 22:43:23 +0200 Subject: [PATCH 07/20] r/w ordering guarantees for atomics --- src/task/blocking.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/task/blocking.rs b/src/task/blocking.rs index 95c20a687..c1ac62a3d 100644 --- a/src/task/blocking.rs +++ b/src/task/blocking.rs @@ -60,9 +60,9 @@ lazy_static! { fn calculate_dispatch_frequency() { // Calculate current message processing rate here - let current_freq = FREQUENCY.fetch_sub(1, Ordering::Relaxed); + let current_freq = FREQUENCY.fetch_sub(1, Ordering::Release); let avr_freq = AVR_FREQUENCY.load(Ordering::Relaxed); - let current_pool_size = CURRENT_POOL_SIZE.load(Ordering::Relaxed); + let current_pool_size = LOW_WATERMARK.max(CURRENT_POOL_SIZE.load(Ordering::Acquire)); let frequency = (avr_freq as f64 + current_freq as f64 / current_pool_size as f64) as u64; AVR_FREQUENCY.store(frequency, Ordering::Relaxed); @@ -116,7 +116,7 @@ fn create_blocking_thread() { // if there is not a thread ready to accept the work or not. fn schedule(t: async_task::Task<()>) { // Add up for every incoming task schedule - FREQUENCY.fetch_add(1, Ordering::Relaxed); + FREQUENCY.fetch_add(1, Ordering::Acquire); // Calculate the amount of threads needed to spin up // then retry sending while blocking. It doesn't spin if @@ -124,7 +124,7 @@ fn schedule(t: async_task::Task<()>) { // case won't happen) let pool_size = EXPECTED_POOL_SIZE.load(Ordering::Relaxed); let current_pool_size = CURRENT_POOL_SIZE.load(Ordering::SeqCst); - let reward = (AVR_FREQUENCY.load(Ordering::Relaxed) as f64 / 2.0_f64) as u64; + let reward = (AVR_FREQUENCY.load(Ordering::Acquire) as f64 / 2.0_f64) as u64; if pool_size > current_pool_size && pool_size <= MAX_THREADS { let needed = pool_size.saturating_sub(current_pool_size); From 08c8e0441b0d13d493043df420562182414e4467 Mon Sep 17 00:00:00 2001 From: Mahmut Bulut Date: Wed, 28 Aug 2019 04:42:19 +0200 Subject: [PATCH 08/20] Use thread adaptation mechanism --- benches/blocking.rs | 12 ++++ src/task/blocking.rs | 163 ++++++++++++++++++++++++++++--------------- tests/thread_pool.rs | 141 +++++++++++++++++++++++++++++++++++++ 3 files changed, 258 insertions(+), 58 deletions(-) create mode 100644 tests/thread_pool.rs diff --git a/benches/blocking.rs b/benches/blocking.rs index 928ff1ade..f9e659748 100644 --- a/benches/blocking.rs +++ b/benches/blocking.rs @@ -9,6 +9,7 @@ use std::thread; use std::time::Duration; use test::Bencher; +// Benchmark for a 10K burst task spawn #[bench] fn blocking(b: &mut Bencher) { b.iter(|| { @@ -24,3 +25,14 @@ fn blocking(b: &mut Bencher) { task::block_on(join_all(handles)); }); } + +// Benchmark for a single blocking task spawn +#[bench] +fn blocking_single(b: &mut Bencher) { + b.iter(|| { + task::blocking::spawn(async { + let duration = Duration::from_millis(1); + thread::sleep(duration); + }) + }); +} diff --git a/src/task/blocking.rs b/src/task/blocking.rs index c1ac62a3d..a2297e71e 100644 --- a/src/task/blocking.rs +++ b/src/task/blocking.rs @@ -1,5 +1,6 @@ //! A thread pool for running blocking functions asynchronously. +use std::collections::VecDeque; use std::fmt; use std::pin::Pin; use std::sync::atomic::{AtomicU64, Ordering}; @@ -12,20 +13,29 @@ use lazy_static::lazy_static; use crate::future::Future; use crate::task::{Context, Poll}; use crate::utils::abort_on_panic; +use std::sync::{Arc, Mutex}; +// Low watermark value, defines the bare minimum of the pool. +// Spawns initial thread set. const LOW_WATERMARK: u64 = 2; -const MAX_THREADS: u64 = 10_000; -// Pool task frequency calculation variables -static AVR_FREQUENCY: AtomicU64 = AtomicU64::new(0); -static FREQUENCY: AtomicU64 = AtomicU64::new(0); +// Pool managers interval time (milliseconds) +// This is the actual interval which makes adaptation calculation +const MANAGER_POLL_INTERVAL: u64 = 50; -// Pool speedup calculation variables -static SPEEDUP: AtomicU64 = AtomicU64::new(0); +// Frequency scale factor for thread adaptation calculation +const FREQUENCY_SCALE_FACTOR: u64 = 200; -// Pool size variables -static EXPECTED_POOL_SIZE: AtomicU64 = AtomicU64::new(LOW_WATERMARK); -static CURRENT_POOL_SIZE: AtomicU64 = AtomicU64::new(LOW_WATERMARK); +// Frequency histogram's sliding window size +// Defines how many frequencies will be considered for adaptation. +const FREQUENCY_QUEUE_SIZE: usize = 10; + +// Possible max threads (without OS contract) +const MAX_THREADS: u64 = 10_000; + +// Pool task frequency variable +// Holds scheduled tasks onto the thread pool for the calculation window +static FREQUENCY: AtomicU64 = AtomicU64::new(0); struct Pool { sender: Sender>, @@ -40,12 +50,22 @@ lazy_static! { .spawn(|| abort_on_panic(|| { for task in &POOL.receiver { task.run(); - calculate_dispatch_frequency(); } })) .expect("cannot start a thread driving blocking tasks"); } + thread::Builder::new() + .name("async-pool-manager".to_string()) + .spawn(|| abort_on_panic(|| { + let poll_interval = Duration::from_millis(MANAGER_POLL_INTERVAL); + loop { + scale_pool(); + thread::sleep(poll_interval); + } + })) + .expect("thread pool manager cannot be started"); + // We want to use an unbuffered channel here to help // us drive our dynamic control. In effect, the // kernel's scheduler becomes the queue, reducing @@ -56,30 +76,76 @@ lazy_static! { let (sender, receiver) = bounded(0); Pool { sender, receiver } }; + + // Pool task frequency calculation variables + static ref FREQ_QUEUE: Arc>> = { + Arc::new(Mutex::new(VecDeque::with_capacity(FREQUENCY_QUEUE_SIZE + 1))) + }; + + // Pool size variable + static ref POOL_SIZE: Arc> = Arc::new(Mutex::new(LOW_WATERMARK)); } -fn calculate_dispatch_frequency() { - // Calculate current message processing rate here - let current_freq = FREQUENCY.fetch_sub(1, Ordering::Release); - let avr_freq = AVR_FREQUENCY.load(Ordering::Relaxed); - let current_pool_size = LOW_WATERMARK.max(CURRENT_POOL_SIZE.load(Ordering::Acquire)); - let frequency = (avr_freq as f64 + current_freq as f64 / current_pool_size as f64) as u64; - AVR_FREQUENCY.store(frequency, Ordering::Relaxed); - - // Adapt the thread count of pool - let speedup = SPEEDUP.load(Ordering::Relaxed); - if frequency > speedup { - // Speedup can be gained. Scale the pool up here. - SPEEDUP.store(frequency, Ordering::Relaxed); - EXPECTED_POOL_SIZE.store(current_pool_size + 1, Ordering::Relaxed); - } else { - // There is no need for the extra threads, schedule them to be closed. - let expected = EXPECTED_POOL_SIZE.load(Ordering::Relaxed); - if 2 * LOW_WATERMARK < expected { - // Substract amount of low watermark - EXPECTED_POOL_SIZE.fetch_sub(LOW_WATERMARK, Ordering::Relaxed); +// Gets the current pool size +// Used for pool size boundary checking in pool manager +fn get_current_pool_size() -> u64 { + let current_arc = POOL_SIZE.clone(); + let current_pool_size = *current_arc.lock().unwrap(); + LOW_WATERMARK.max(current_pool_size) +} + +// Adaptive pool scaling function +// +// This allows to spawn new threads to make room for incoming task pressure. +// Works in the background detached from the pool system and scales up the pool based +// on the request rate. +// +// It uses frequency based calculation to define work. Utilizing average processing rate. +fn scale_pool() { + // Fetch current frequency, it does matter that operations are ordered in this approach. + let current_frequency = FREQUENCY.load(Ordering::SeqCst); + let freq_queue_arc = FREQ_QUEUE.clone(); + let mut freq_queue = freq_queue_arc.lock().unwrap(); + + // Make it safe to start for calculations by adding initial frequency scale + if freq_queue.len() == 0 { + freq_queue.push_back(0); + } + + // Calculate message rate at the given time window + // Factor is there for making adaptation linear. + // Exponential bursts may create idle threads which won't be utilized. + // They may actually cause slowdown. + let current_freq_factorized = current_frequency as f64 * FREQUENCY_SCALE_FACTOR as f64; + let frequency = (current_freq_factorized / MANAGER_POLL_INTERVAL as f64) as u64; + + // Adapts the thread count of pool + // + // Sliding window of frequencies visited by the pool manager. + // Select the maximum from the window and check against the current task dispatch frequency. + // If current frequency is bigger, we will scale up. + if let Some(&max_measurement) = freq_queue.iter().max() { + if frequency > max_measurement { + let scale_by = num_cpus::get().max(LOW_WATERMARK as usize) as u64; + + // Pool size can't reach to max_threads anyway. + // Pool manager backpressures itself while visiting message rate frequencies. + // You will get an error before hitting to limits by OS. + if get_current_pool_size() < MAX_THREADS { + (0..scale_by).for_each(|_| { + create_blocking_thread(); + }); + } } } + + // Add seen frequency data to the frequency histogram. + freq_queue.push_back(frequency); + if freq_queue.len() == FREQUENCY_QUEUE_SIZE + 1 { + freq_queue.pop_front(); + } + + FREQUENCY.store(0, Ordering::Release); } // Creates yet another thread to receive tasks. @@ -100,12 +166,18 @@ fn create_blocking_thread() { .spawn(move || { let wait_limit = Duration::from_millis(1000 + rand_sleep_ms); - CURRENT_POOL_SIZE.fetch_add(1, Ordering::SeqCst); + // Adjust the pool size counter before and after spawn + { + let current_arc = POOL_SIZE.clone(); + *current_arc.lock().unwrap() += 1; + } while let Ok(task) = POOL.receiver.recv_timeout(wait_limit) { abort_on_panic(|| task.run()); - calculate_dispatch_frequency(); } - CURRENT_POOL_SIZE.fetch_sub(1, Ordering::SeqCst); + { + let current_arc = POOL_SIZE.clone(); + *current_arc.lock().unwrap() -= 1; + } }) .expect("cannot start a dynamic thread driving blocking tasks"); } @@ -118,35 +190,10 @@ fn schedule(t: async_task::Task<()>) { // Add up for every incoming task schedule FREQUENCY.fetch_add(1, Ordering::Acquire); - // Calculate the amount of threads needed to spin up - // then retry sending while blocking. It doesn't spin if - // expected pool size is above the MAX_THREADS (which is a - // case won't happen) - let pool_size = EXPECTED_POOL_SIZE.load(Ordering::Relaxed); - let current_pool_size = CURRENT_POOL_SIZE.load(Ordering::SeqCst); - let reward = (AVR_FREQUENCY.load(Ordering::Acquire) as f64 / 2.0_f64) as u64; - - if pool_size > current_pool_size && pool_size <= MAX_THREADS { - let needed = pool_size.saturating_sub(current_pool_size); - - // For safety, check boundaries before spawning threads. - // This also won't be expected to happen. But better safe than sorry. - if needed > 0 && (needed < pool_size || needed < current_pool_size) { - (0..needed).for_each(|_| { - create_blocking_thread(); - }); - } - } - if let Err(err) = POOL.sender.try_send(t) { // We were not able to send to the channel without // blocking. POOL.sender.send(err.into_inner()).unwrap(); - } else { - // Every successful dispatch, rewarded with negative - if reward + (2 * LOW_WATERMARK) < pool_size { - EXPECTED_POOL_SIZE.fetch_sub(reward, Ordering::Relaxed); - } } } diff --git a/tests/thread_pool.rs b/tests/thread_pool.rs new file mode 100644 index 000000000..08bb470ac --- /dev/null +++ b/tests/thread_pool.rs @@ -0,0 +1,141 @@ +use async_std::task; +use async_std::task::blocking::JoinHandle; +use futures::future::join_all; +use std::thread; +use std::time::Duration; +use std::time::Instant; + +// Test for slow joins without task bursts during joins. +#[test] +fn slow_join() { + let thread_join_time_max = 11_000; + let start = Instant::now(); + + // Send an initial batch of million bursts. + let handles = (0..1_000_000) + .map(|_| { + task::blocking::spawn(async { + let duration = Duration::from_millis(1); + thread::sleep(duration); + }) + }) + .collect::>>(); + + task::block_on(join_all(handles)); + + // Let them join to see how it behaves under different workloads. + let duration = Duration::from_millis(thread_join_time_max); + thread::sleep(duration); + + // Spawn yet another batch of work on top of it + let handles = (0..10_000) + .map(|_| { + task::blocking::spawn(async { + let duration = Duration::from_millis(100); + thread::sleep(duration); + }) + }) + .collect::>>(); + + task::block_on(join_all(handles)); + + // Slow joins shouldn't cause internal slow down + let elapsed = start.elapsed().as_millis() - thread_join_time_max as u128; + println!("Slow task join. Monotonic exec time: {:?} ns", elapsed); + + // Should be less than 25_000 ns + // Previous implementation is around this threshold. + assert_eq!(elapsed < 25_000, true); +} + +// Test for slow joins with task burst. +#[test] +fn slow_join_interrupted() { + let thread_join_time_max = 2_000; + let start = Instant::now(); + + // Send an initial batch of million bursts. + let handles = (0..1_000_000) + .map(|_| { + task::blocking::spawn(async { + let duration = Duration::from_millis(1); + thread::sleep(duration); + }) + }) + .collect::>>(); + + task::block_on(join_all(handles)); + + // Let them join to see how it behaves under different workloads. + // This time join under the time window. + let duration = Duration::from_millis(thread_join_time_max); + thread::sleep(duration); + + // Spawn yet another batch of work on top of it + let handles = (0..10_000) + .map(|_| { + task::blocking::spawn(async { + let duration = Duration::from_millis(100); + thread::sleep(duration); + }) + }) + .collect::>>(); + + task::block_on(join_all(handles)); + + // Slow joins shouldn't cause internal slow down + let elapsed = start.elapsed().as_millis() - thread_join_time_max as u128; + println!("Slow task join. Monotonic exec time: {:?} ns", elapsed); + + // Should be less than 25_000 ns + // Previous implementation is around this threshold. + assert_eq!(elapsed < 25_000, true); +} + +// This test is expensive but it proves that longhauling tasks are working in adaptive thread pool. +// Thread pool which spawns on-demand will panic with this test. +#[test] +#[ignore] +fn longhauling_task_join() { + let thread_join_time_max = 11_000; + let start = Instant::now(); + + // First batch of overhauling tasks + let handles = (0..100_000) + .map(|_| { + task::blocking::spawn(async { + let duration = Duration::from_millis(1000); + thread::sleep(duration); + }) + }) + .collect::>>(); + + task::block_on(join_all(handles)); + + // Let them join to see how it behaves under different workloads. + let duration = Duration::from_millis(thread_join_time_max); + thread::sleep(duration); + + // Send yet another medium sized batch to see how it scales. + let handles = (0..10_000) + .map(|_| { + task::blocking::spawn(async { + let duration = Duration::from_millis(100); + thread::sleep(duration); + }) + }) + .collect::>>(); + + task::block_on(join_all(handles)); + + // Slow joins shouldn't cause internal slow down + let elapsed = start.elapsed().as_millis() - thread_join_time_max as u128; + println!( + "Long-hauling task join. Monotonic exec time: {:?} ns", + elapsed + ); + + // Should be less than 200_000 ns + // Previous implementation will panic when this test is running. + assert_eq!(elapsed < 200_000, true); +} From 2dca64d26df0dc06081cd442fc7d99b213c4d532 Mon Sep 17 00:00:00 2001 From: Mahmut Bulut Date: Wed, 28 Aug 2019 04:44:35 +0200 Subject: [PATCH 09/20] Documentation for thread scale --- src/task/blocking.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/task/blocking.rs b/src/task/blocking.rs index a2297e71e..ed29644a8 100644 --- a/src/task/blocking.rs +++ b/src/task/blocking.rs @@ -126,6 +126,8 @@ fn scale_pool() { // If current frequency is bigger, we will scale up. if let Some(&max_measurement) = freq_queue.iter().max() { if frequency > max_measurement { + // Don't spawn more than cores. + // Default behaviour of most of the linear adapting thread pools. let scale_by = num_cpus::get().max(LOW_WATERMARK as usize) as u64; // Pool size can't reach to max_threads anyway. From d9785cc72552df3bb336386d64ea391b4dfa1aa9 Mon Sep 17 00:00:00 2001 From: Mahmut Bulut Date: Wed, 28 Aug 2019 05:05:16 +0200 Subject: [PATCH 10/20] Ignore concurrency tests --- tests/thread_pool.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/thread_pool.rs b/tests/thread_pool.rs index 08bb470ac..64b81e6e3 100644 --- a/tests/thread_pool.rs +++ b/tests/thread_pool.rs @@ -7,6 +7,7 @@ use std::time::Instant; // Test for slow joins without task bursts during joins. #[test] +#[ignore] fn slow_join() { let thread_join_time_max = 11_000; let start = Instant::now(); @@ -50,6 +51,7 @@ fn slow_join() { // Test for slow joins with task burst. #[test] +#[ignore] fn slow_join_interrupted() { let thread_join_time_max = 2_000; let start = Instant::now(); From 9135ca01609953b0f1c4f9f0599ad131217975cc Mon Sep 17 00:00:00 2001 From: Mahmut Bulut Date: Wed, 28 Aug 2019 16:57:38 +0200 Subject: [PATCH 11/20] Pool manager thread documentation --- src/task/blocking.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/task/blocking.rs b/src/task/blocking.rs index ed29644a8..8b42900e7 100644 --- a/src/task/blocking.rs +++ b/src/task/blocking.rs @@ -55,6 +55,8 @@ lazy_static! { .expect("cannot start a thread driving blocking tasks"); } + // Pool manager to check frequency of task rates + // and take action by scaling the pool accordingly. thread::Builder::new() .name("async-pool-manager".to_string()) .spawn(|| abort_on_panic(|| { From 4606893a5e2372168e028fd146c0fcfde632741b Mon Sep 17 00:00:00 2001 From: Mahmut Bulut Date: Fri, 30 Aug 2019 00:00:56 +0200 Subject: [PATCH 12/20] Move to the exponential scaling algorithm --- src/task/blocking.rs | 61 +++++++++++++++++++++++++------------------- 1 file changed, 35 insertions(+), 26 deletions(-) diff --git a/src/task/blocking.rs b/src/task/blocking.rs index 8b42900e7..98e77ee27 100644 --- a/src/task/blocking.rs +++ b/src/task/blocking.rs @@ -21,15 +21,16 @@ const LOW_WATERMARK: u64 = 2; // Pool managers interval time (milliseconds) // This is the actual interval which makes adaptation calculation -const MANAGER_POLL_INTERVAL: u64 = 50; - -// Frequency scale factor for thread adaptation calculation -const FREQUENCY_SCALE_FACTOR: u64 = 200; +const MANAGER_POLL_INTERVAL: u64 = 200; // Frequency histogram's sliding window size // Defines how many frequencies will be considered for adaptation. const FREQUENCY_QUEUE_SIZE: usize = 10; +// Exponential moving average smoothing coefficient for limited window +// Smoothing factor is estimated with: 2 / (N + 1) where N is sample size. +const EMA_COEFFICIENT: f64 = 2_f64 / (FREQUENCY_QUEUE_SIZE as f64 + 1_f64); + // Possible max threads (without OS contract) const MAX_THREADS: u64 = 10_000; @@ -96,6 +97,12 @@ fn get_current_pool_size() -> u64 { LOW_WATERMARK.max(current_pool_size) } +fn calculate_ema(freq_queue: &VecDeque) -> f64 { + freq_queue.iter().enumerate().fold(0_f64, |acc, (i, freq)| { + acc + ((*freq as f64) * ((1_f64 - EMA_COEFFICIENT).powf(i as f64) as f64)) + }) * EMA_COEFFICIENT as f64 +} + // Adaptive pool scaling function // // This allows to spawn new threads to make room for incoming task pressure. @@ -114,34 +121,15 @@ fn scale_pool() { freq_queue.push_back(0); } - // Calculate message rate at the given time window - // Factor is there for making adaptation linear. - // Exponential bursts may create idle threads which won't be utilized. - // They may actually cause slowdown. - let current_freq_factorized = current_frequency as f64 * FREQUENCY_SCALE_FACTOR as f64; - let frequency = (current_freq_factorized / MANAGER_POLL_INTERVAL as f64) as u64; + // Calculate message rate for the given time window + let frequency = (current_frequency as f64 / MANAGER_POLL_INTERVAL as f64) as u64; // Adapts the thread count of pool // // Sliding window of frequencies visited by the pool manager. // Select the maximum from the window and check against the current task dispatch frequency. // If current frequency is bigger, we will scale up. - if let Some(&max_measurement) = freq_queue.iter().max() { - if frequency > max_measurement { - // Don't spawn more than cores. - // Default behaviour of most of the linear adapting thread pools. - let scale_by = num_cpus::get().max(LOW_WATERMARK as usize) as u64; - - // Pool size can't reach to max_threads anyway. - // Pool manager backpressures itself while visiting message rate frequencies. - // You will get an error before hitting to limits by OS. - if get_current_pool_size() < MAX_THREADS { - (0..scale_by).for_each(|_| { - create_blocking_thread(); - }); - } - } - } + let prev_ema_frequency = calculate_ema(&freq_queue); // Add seen frequency data to the frequency histogram. freq_queue.push_back(frequency); @@ -149,6 +137,27 @@ fn scale_pool() { freq_queue.pop_front(); } + let curr_ema_frequency = calculate_ema(&freq_queue); + + if curr_ema_frequency > prev_ema_frequency { + let scale_by: f64 = curr_ema_frequency - prev_ema_frequency; + let scale = ((LOW_WATERMARK as f64 * scale_by) + LOW_WATERMARK as f64) as u64; + + // Pool size shouldn't reach to max_threads anyway. + // Pool manager backpressures itself while visiting message rate frequencies. + // You will get an error before hitting to limits by OS. + (0..scale).for_each(|_| { + create_blocking_thread(); + }); + } else if curr_ema_frequency == prev_ema_frequency && current_frequency != 0 { + // Throughput is low. Allocate more threads to unblock flow. + let scale = LOW_WATERMARK * current_frequency + 1; + + (0..scale).for_each(|_| { + create_blocking_thread(); + }); + } + FREQUENCY.store(0, Ordering::Release); } From 73ccc67bf859af671d364effdf49d46ba4ac8c07 Mon Sep 17 00:00:00 2001 From: Mahmut Bulut Date: Fri, 30 Aug 2019 01:58:31 +0200 Subject: [PATCH 13/20] Solve pool max threads problem for MacOS --- src/task/blocking.rs | 86 ++++++++++++++++++++++++++++++++------------ tests/thread_pool.rs | 4 +-- 2 files changed, 64 insertions(+), 26 deletions(-) diff --git a/src/task/blocking.rs b/src/task/blocking.rs index 98e77ee27..d45c90428 100644 --- a/src/task/blocking.rs +++ b/src/task/blocking.rs @@ -11,6 +11,7 @@ use crossbeam_channel::{bounded, Receiver, Sender}; use lazy_static::lazy_static; use crate::future::Future; +use crate::io::ErrorKind; use crate::task::{Context, Poll}; use crate::utils::abort_on_panic; use std::sync::{Arc, Mutex}; @@ -31,13 +32,13 @@ const FREQUENCY_QUEUE_SIZE: usize = 10; // Smoothing factor is estimated with: 2 / (N + 1) where N is sample size. const EMA_COEFFICIENT: f64 = 2_f64 / (FREQUENCY_QUEUE_SIZE as f64 + 1_f64); -// Possible max threads (without OS contract) -const MAX_THREADS: u64 = 10_000; - // Pool task frequency variable // Holds scheduled tasks onto the thread pool for the calculation window static FREQUENCY: AtomicU64 = AtomicU64::new(0); +// Possible max threads (without OS contract) +static MAX_THREADS: AtomicU64 = AtomicU64::new(10_000); + struct Pool { sender: Sender>, receiver: Receiver>, @@ -89,14 +90,19 @@ lazy_static! { static ref POOL_SIZE: Arc> = Arc::new(Mutex::new(LOW_WATERMARK)); } -// Gets the current pool size -// Used for pool size boundary checking in pool manager -fn get_current_pool_size() -> u64 { - let current_arc = POOL_SIZE.clone(); - let current_pool_size = *current_arc.lock().unwrap(); - LOW_WATERMARK.max(current_pool_size) -} - +// Exponentially Weighted Moving Average calculation +// +// This allows us to find the EMA value. +// This value represents the trend of tasks mapped onto the thread pool. +// Calculation is following: +// +// α :: EMA_COEFFICIENT :: smoothing factor between 0 and 1 +// Yt :: freq :: frequency sample at time t +// St :: acc :: EMA at time t +// +// Under these definitions formula is following: +// EMA = α * [ Yt + (1 - α)*Yt-1 + ((1 - α)^2)*Yt-2 + ((1 - α)^3)*Yt-3 ... ] + St +#[inline] fn calculate_ema(freq_queue: &VecDeque) -> f64 { freq_queue.iter().enumerate().fold(0_f64, |acc, (i, freq)| { acc + ((*freq as f64) * ((1_f64 - EMA_COEFFICIENT).powf(i as f64) as f64)) @@ -124,11 +130,7 @@ fn scale_pool() { // Calculate message rate for the given time window let frequency = (current_frequency as f64 / MANAGER_POLL_INTERVAL as f64) as u64; - // Adapts the thread count of pool - // - // Sliding window of frequencies visited by the pool manager. - // Select the maximum from the window and check against the current task dispatch frequency. - // If current frequency is bigger, we will scale up. + // Calculates current time window's EMA value (including last sample) let prev_ema_frequency = calculate_ema(&freq_queue); // Add seen frequency data to the frequency histogram. @@ -137,22 +139,35 @@ fn scale_pool() { freq_queue.pop_front(); } + // Calculates current time window's EMA value (including last sample) let curr_ema_frequency = calculate_ema(&freq_queue); + // Adapts the thread count of pool + // + // Sliding window of frequencies visited by the pool manager. + // Pool manager creates EMA value for previous window and current window. + // Compare them to determine scaling amount based on the trends. + // If current EMA value is bigger, we will scale up. if curr_ema_frequency > prev_ema_frequency { + // "Scale by" amount can be seen as "how much load is coming". + // "Scale" amount is "how many threads we should spawn". let scale_by: f64 = curr_ema_frequency - prev_ema_frequency; let scale = ((LOW_WATERMARK as f64 * scale_by) + LOW_WATERMARK as f64) as u64; - // Pool size shouldn't reach to max_threads anyway. - // Pool manager backpressures itself while visiting message rate frequencies. - // You will get an error before hitting to limits by OS. + // It is time to scale the pool! (0..scale).for_each(|_| { create_blocking_thread(); }); - } else if curr_ema_frequency == prev_ema_frequency && current_frequency != 0 { + } else if (curr_ema_frequency - prev_ema_frequency).abs() < std::f64::EPSILON + && current_frequency != 0 + { // Throughput is low. Allocate more threads to unblock flow. + // If we fall to this case, scheduler is congested by longhauling tasks. + // For unblock the flow we should add up some threads to the pool, but not that much to + // stagger the program's operation. let scale = LOW_WATERMARK * current_frequency + 1; + // Scale it up! (0..scale).for_each(|_| { create_blocking_thread(); }); @@ -165,6 +180,16 @@ fn scale_pool() { // Dynamic threads will terminate themselves if they don't // receive any work after between one and ten seconds. fn create_blocking_thread() { + // Check that thread is spawnable. + // If it hits to the OS limits don't spawn it. + { + let current_arc = POOL_SIZE.clone(); + let pool_size = *current_arc.lock().unwrap(); + if pool_size >= MAX_THREADS.load(Ordering::SeqCst) { + MAX_THREADS.store(10_000, Ordering::SeqCst); + return; + } + } // We want to avoid having all threads terminate at // exactly the same time, causing thundering herd // effects. We want to stagger their destruction over @@ -174,7 +199,7 @@ fn create_blocking_thread() { // Generate a simple random number of milliseconds let rand_sleep_ms = u64::from(random(10_000)); - thread::Builder::new() + let _ = thread::Builder::new() .name("async-blocking-driver-dynamic".to_string()) .spawn(move || { let wait_limit = Duration::from_millis(1000 + rand_sleep_ms); @@ -192,7 +217,22 @@ fn create_blocking_thread() { *current_arc.lock().unwrap() -= 1; } }) - .expect("cannot start a dynamic thread driving blocking tasks"); + .map_err(|err| { + match err.kind() { + ErrorKind::WouldBlock => { + // Maximum allowed threads per process is varying from system to system. + // Some systems has it(like MacOS), some doesn't(Linux) + // This case expected to not happen. + // But when happened this shouldn't throw a panic. + let current_arc = POOL_SIZE.clone(); + MAX_THREADS.store(*current_arc.lock().unwrap() - 1, Ordering::SeqCst); + } + _ => eprintln!( + "cannot start a dynamic thread driving blocking tasks: {}", + err + ), + } + }); } // Enqueues work, attempting to send to the threadpool in a @@ -200,7 +240,7 @@ fn create_blocking_thread() { // based on the previous statistics without relying on // if there is not a thread ready to accept the work or not. fn schedule(t: async_task::Task<()>) { - // Add up for every incoming task schedule + // Add up for every incoming scheduled task FREQUENCY.fetch_add(1, Ordering::Acquire); if let Err(err) = POOL.sender.try_send(t) { diff --git a/tests/thread_pool.rs b/tests/thread_pool.rs index 64b81e6e3..d3933b776 100644 --- a/tests/thread_pool.rs +++ b/tests/thread_pool.rs @@ -103,7 +103,7 @@ fn longhauling_task_join() { let start = Instant::now(); // First batch of overhauling tasks - let handles = (0..100_000) + let _ = (0..100_000) .map(|_| { task::blocking::spawn(async { let duration = Duration::from_millis(1000); @@ -112,8 +112,6 @@ fn longhauling_task_join() { }) .collect::>>(); - task::block_on(join_all(handles)); - // Let them join to see how it behaves under different workloads. let duration = Duration::from_millis(thread_join_time_max); thread::sleep(duration); From 186d55ffa640d8ec8ca337572ef79e2918226283 Mon Sep 17 00:00:00 2001 From: Mahmut Bulut Date: Fri, 30 Aug 2019 02:33:06 +0200 Subject: [PATCH 14/20] Typos and table formatting --- src/task/blocking.rs | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/src/task/blocking.rs b/src/task/blocking.rs index d45c90428..6b5957b7c 100644 --- a/src/task/blocking.rs +++ b/src/task/blocking.rs @@ -96,9 +96,13 @@ lazy_static! { // This value represents the trend of tasks mapped onto the thread pool. // Calculation is following: // -// α :: EMA_COEFFICIENT :: smoothing factor between 0 and 1 -// Yt :: freq :: frequency sample at time t -// St :: acc :: EMA at time t +// +--------+-----------------+----------------------------------+ +// | Symbol | Identifier | Explanation | +// +--------+-----------------+----------------------------------+ +// | α | EMA_COEFFICIENT | smoothing factor between 0 and 1 | +// | Yt | freq | frequency sample at time t | +// | St | acc | EMA at time t | +// +--------+-----------------+----------------------------------+ // // Under these definitions formula is following: // EMA = α * [ Yt + (1 - α)*Yt-1 + ((1 - α)^2)*Yt-2 + ((1 - α)^3)*Yt-3 ... ] + St @@ -163,7 +167,7 @@ fn scale_pool() { { // Throughput is low. Allocate more threads to unblock flow. // If we fall to this case, scheduler is congested by longhauling tasks. - // For unblock the flow we should add up some threads to the pool, but not that much to + // For unblock the flow we should add up some threads to the pool, but not that many to // stagger the program's operation. let scale = LOW_WATERMARK * current_frequency + 1; @@ -221,8 +225,8 @@ fn create_blocking_thread() { match err.kind() { ErrorKind::WouldBlock => { // Maximum allowed threads per process is varying from system to system. - // Some systems has it(like MacOS), some doesn't(Linux) - // This case expected to not happen. + // Also, some systems have it(like macOS), and some don't(Linux). + // This case expected not to happen. // But when happened this shouldn't throw a panic. let current_arc = POOL_SIZE.clone(); MAX_THREADS.store(*current_arc.lock().unwrap() - 1, Ordering::SeqCst); @@ -235,7 +239,7 @@ fn create_blocking_thread() { }); } -// Enqueues work, attempting to send to the threadpool in a +// Enqueues work, attempting to send to the thread pool in a // nonblocking way and spinning up needed amount of threads // based on the previous statistics without relying on // if there is not a thread ready to accept the work or not. From e765cee30dbdab56db8c113c872a6c20e735ca1f Mon Sep 17 00:00:00 2001 From: Mahmut Bulut Date: Fri, 30 Aug 2019 11:44:25 +0200 Subject: [PATCH 15/20] Doc comments for the module --- src/task/blocking.rs | 119 +++++++++++++++++++++++++++---------------- 1 file changed, 76 insertions(+), 43 deletions(-) diff --git a/src/task/blocking.rs b/src/task/blocking.rs index 6b5957b7c..cd85f67c7 100644 --- a/src/task/blocking.rs +++ b/src/task/blocking.rs @@ -1,4 +1,30 @@ //! A thread pool for running blocking functions asynchronously. +//! +//! Blocking thread pool consists of four elements: +//! * Frequency Detector +//! * Trend Estimator +//! * Predictive Upscaler +//! * Time-based Downscaler +//! +//! ## Frequency Detector +//! Detects how many tasks are submitted from scheduler to thread pool in a given time frame. +//! Pool manager thread does this sampling every 200 milliseconds. +//! This value is going to be used for trend estimation phase. +//! +//! ## Trend Estimator +//! Hold up to the given number of frequencies to create an estimation. +//! This pool holds 10 frequencies at a time. +//! Estimation algorithm and prediction uses Exponentially Weighted Moving Average algorithm. +//! +//! Algorithm is altered and adapted from [A Novel Predictive and Self–Adaptive Dynamic Thread Pool Management](https://doi.org/10.1109/ISPA.2011.61). +//! +//! ## Predictive Upscaler +//! Selects upscaling amount based on estimation or when throughput hogs based on amount of tasks mapped. +//! Throughput hogs determined by a combination of job in / job out frequency and current scheduler task assignment frequency. +//! +//! ## Time-based Downscaler +//! After dynamic tasks spawned with upscaler they will continue working in between 1 second and 10 seconds. +//! When tasks are detached from the channels after this amount they join back. use std::collections::VecDeque; use std::fmt; @@ -16,35 +42,37 @@ use crate::task::{Context, Poll}; use crate::utils::abort_on_panic; use std::sync::{Arc, Mutex}; -// Low watermark value, defines the bare minimum of the pool. -// Spawns initial thread set. +/// Low watermark value, defines the bare minimum of the pool. +/// Spawns initial thread set. const LOW_WATERMARK: u64 = 2; -// Pool managers interval time (milliseconds) -// This is the actual interval which makes adaptation calculation +/// Pool managers interval time (milliseconds). +/// This is the actual interval which makes adaptation calculation. const MANAGER_POLL_INTERVAL: u64 = 200; -// Frequency histogram's sliding window size -// Defines how many frequencies will be considered for adaptation. +/// Frequency histogram's sliding window size. +/// Defines how many frequencies will be considered for adaptation. const FREQUENCY_QUEUE_SIZE: usize = 10; -// Exponential moving average smoothing coefficient for limited window -// Smoothing factor is estimated with: 2 / (N + 1) where N is sample size. +/// Exponential moving average smoothing coefficient for limited window. +/// Smoothing factor is estimated with: 2 / (N + 1) where N is sample size. const EMA_COEFFICIENT: f64 = 2_f64 / (FREQUENCY_QUEUE_SIZE as f64 + 1_f64); -// Pool task frequency variable -// Holds scheduled tasks onto the thread pool for the calculation window +/// Pool task frequency variable. +/// Holds scheduled tasks onto the thread pool for the calculation window. static FREQUENCY: AtomicU64 = AtomicU64::new(0); -// Possible max threads (without OS contract) +/// Possible max threads (without OS contract). static MAX_THREADS: AtomicU64 = AtomicU64::new(10_000); +/// Pool interface between the scheduler and thread pool struct Pool { sender: Sender>, receiver: Receiver>, } lazy_static! { + /// Blocking pool with static starting thread count. static ref POOL: Pool = { for _ in 0..LOW_WATERMARK { thread::Builder::new() @@ -81,31 +109,36 @@ lazy_static! { Pool { sender, receiver } }; - // Pool task frequency calculation variables + /// Sliding window for pool task frequency calculation static ref FREQ_QUEUE: Arc>> = { Arc::new(Mutex::new(VecDeque::with_capacity(FREQUENCY_QUEUE_SIZE + 1))) }; - // Pool size variable + /// Dynamic pool thread count variable static ref POOL_SIZE: Arc> = Arc::new(Mutex::new(LOW_WATERMARK)); } -// Exponentially Weighted Moving Average calculation -// -// This allows us to find the EMA value. -// This value represents the trend of tasks mapped onto the thread pool. -// Calculation is following: -// -// +--------+-----------------+----------------------------------+ -// | Symbol | Identifier | Explanation | -// +--------+-----------------+----------------------------------+ -// | α | EMA_COEFFICIENT | smoothing factor between 0 and 1 | -// | Yt | freq | frequency sample at time t | -// | St | acc | EMA at time t | -// +--------+-----------------+----------------------------------+ -// -// Under these definitions formula is following: -// EMA = α * [ Yt + (1 - α)*Yt-1 + ((1 - α)^2)*Yt-2 + ((1 - α)^3)*Yt-3 ... ] + St +/// Exponentially Weighted Moving Average calculation +/// +/// This allows us to find the EMA value. +/// This value represents the trend of tasks mapped onto the thread pool. +/// Calculation is following: +/// ``` +/// +--------+-----------------+----------------------------------+ +/// | Symbol | Identifier | Explanation | +/// +--------+-----------------+----------------------------------+ +/// | α | EMA_COEFFICIENT | smoothing factor between 0 and 1 | +/// | Yt | freq | frequency sample at time t | +/// | St | acc | EMA at time t | +/// +--------+-----------------+----------------------------------+ +/// ``` +/// Under these definitions formula is following: +/// ``` +/// EMA = α * [ Yt + (1 - α)*Yt-1 + ((1 - α)^2)*Yt-2 + ((1 - α)^3)*Yt-3 ... ] + St +/// ``` +/// # Arguments +/// +/// * `freq_queue` - Sliding window of frequency samples #[inline] fn calculate_ema(freq_queue: &VecDeque) -> f64 { freq_queue.iter().enumerate().fold(0_f64, |acc, (i, freq)| { @@ -113,13 +146,13 @@ fn calculate_ema(freq_queue: &VecDeque) -> f64 { }) * EMA_COEFFICIENT as f64 } -// Adaptive pool scaling function -// -// This allows to spawn new threads to make room for incoming task pressure. -// Works in the background detached from the pool system and scales up the pool based -// on the request rate. -// -// It uses frequency based calculation to define work. Utilizing average processing rate. +/// Adaptive pool scaling function +/// +/// This allows to spawn new threads to make room for incoming task pressure. +/// Works in the background detached from the pool system and scales up the pool based +/// on the request rate. +/// +/// It uses frequency based calculation to define work. Utilizing average processing rate. fn scale_pool() { // Fetch current frequency, it does matter that operations are ordered in this approach. let current_frequency = FREQUENCY.load(Ordering::SeqCst); @@ -180,9 +213,9 @@ fn scale_pool() { FREQUENCY.store(0, Ordering::Release); } -// Creates yet another thread to receive tasks. -// Dynamic threads will terminate themselves if they don't -// receive any work after between one and ten seconds. +/// Creates blocking thread to receive tasks +/// Dynamic threads will terminate themselves if they don't +/// receive any work after between one and ten seconds. fn create_blocking_thread() { // Check that thread is spawnable. // If it hits to the OS limits don't spawn it. @@ -239,10 +272,10 @@ fn create_blocking_thread() { }); } -// Enqueues work, attempting to send to the thread pool in a -// nonblocking way and spinning up needed amount of threads -// based on the previous statistics without relying on -// if there is not a thread ready to accept the work or not. +/// Enqueues work, attempting to send to the thread pool in a +/// nonblocking way and spinning up needed amount of threads +/// based on the previous statistics without relying on +/// if there is not a thread ready to accept the work or not. fn schedule(t: async_task::Task<()>) { // Add up for every incoming scheduled task FREQUENCY.fetch_add(1, Ordering::Acquire); From 234025097b66b7517180274fee93af5eaf1d7f5a Mon Sep 17 00:00:00 2001 From: Mahmut Bulut Date: Fri, 30 Aug 2019 12:43:58 +0200 Subject: [PATCH 16/20] Make no format block text --- src/task/blocking.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/task/blocking.rs b/src/task/blocking.rs index cd85f67c7..b4ce948e3 100644 --- a/src/task/blocking.rs +++ b/src/task/blocking.rs @@ -123,7 +123,7 @@ lazy_static! { /// This allows us to find the EMA value. /// This value represents the trend of tasks mapped onto the thread pool. /// Calculation is following: -/// ``` +/// ```text /// +--------+-----------------+----------------------------------+ /// | Symbol | Identifier | Explanation | /// +--------+-----------------+----------------------------------+ @@ -133,7 +133,7 @@ lazy_static! { /// +--------+-----------------+----------------------------------+ /// ``` /// Under these definitions formula is following: -/// ``` +/// ```text /// EMA = α * [ Yt + (1 - α)*Yt-1 + ((1 - α)^2)*Yt-2 + ((1 - α)^3)*Yt-3 ... ] + St /// ``` /// # Arguments From 572b80fadb52fc7c24a0c38307a3dbc87602edc8 Mon Sep 17 00:00:00 2001 From: Mahmut Bulut Date: Fri, 30 Aug 2019 16:09:19 +0200 Subject: [PATCH 17/20] Refactor doc comments --- src/task/blocking.rs | 37 +++++++++++++++++++++++++++++++------ 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/src/task/blocking.rs b/src/task/blocking.rs index b4ce948e3..e910d41e6 100644 --- a/src/task/blocking.rs +++ b/src/task/blocking.rs @@ -13,18 +13,43 @@ //! //! ## Trend Estimator //! Hold up to the given number of frequencies to create an estimation. -//! This pool holds 10 frequencies at a time. +//! Trend estimator holds 10 frequencies at a time. +//! This value is stored as constant in [FREQUENCY_QUEUE_SIZE](constant.FREQUENCY_QUEUE_SIZE.html). //! Estimation algorithm and prediction uses Exponentially Weighted Moving Average algorithm. //! -//! Algorithm is altered and adapted from [A Novel Predictive and Self–Adaptive Dynamic Thread Pool Management](https://doi.org/10.1109/ISPA.2011.61). +//! This algorithm is adapted from [A Novel Predictive and Self–Adaptive Dynamic Thread Pool Management](https://doi.org/10.1109/ISPA.2011.61) +//! and altered to: +//! * use instead of heavy calculation of trend, utilize thread redundancy which is the sum of the differences between the predicted and observed value. +//! * use instead of linear trend estimation, it uses exponential trend estimation where formula is: +//! ```text +//! LOW_WATERMARK * (predicted - observed) + LOW_WATERMARK +//! ``` +//! *NOTE:* If this algorithm wants to be tweaked increasing [LOW_WATERMARK](constant.LOW_WATERMARK.html) will automatically adapt the additional dynamic thread spawn count +//! * operate without watermarking by timestamps (in paper which is used to measure algorithms own performance during the execution) +//! * operate extensive subsampling. Extensive subsampling congests the pool manager thread. +//! * operate without keeping track of idle time of threads or job out queue like TEMA and FOPS implementations. //! //! ## Predictive Upscaler -//! Selects upscaling amount based on estimation or when throughput hogs based on amount of tasks mapped. +//! Upscaler has three cases (also can be seen in paper): +//! * The rate slightly increases and there are many idle threads. +//! * The number of worker threads tends to be reduced since the workload of the system is descending. +//! * The system has no request or stalled. (Our case here is when the current tasks block further tasks from being processed – throughput hogs) +//! +//! For the first two EMA calculation and exponential trend estimation gives good performance. +//! For the last case, upscaler selects upscaling amount by amount of tasks mapped when throughput hogs happen. +//! +//! **example scenario:** Let's say we have 10_000 tasks where every one of them is blocking for 1 second. Scheduler will map plenty of tasks but will got rejected. +//! This makes estimation calculation nearly 0 for both entering and exiting parts. When this happens and we still see tasks mapped from scheduler. +//! We start to slowly increase threads by amount of frequency linearly. High increase of this value either make us hit to the thread threshold on +//! some OS or make congestion on the other thread utilizations of the program, because of context switch. +//! //! Throughput hogs determined by a combination of job in / job out frequency and current scheduler task assignment frequency. +//! Threshold of EMA difference is eluded by machine epsilon for floating point arithmetic errors. //! //! ## Time-based Downscaler -//! After dynamic tasks spawned with upscaler they will continue working in between 1 second and 10 seconds. -//! When tasks are detached from the channels after this amount they join back. +//! When threads becomes idle, they will not shut down immediately. +//! Instead, they wait a random amount between 1 and 11 seconds +//! to even out the load. use std::collections::VecDeque; use std::fmt; @@ -59,7 +84,7 @@ const FREQUENCY_QUEUE_SIZE: usize = 10; const EMA_COEFFICIENT: f64 = 2_f64 / (FREQUENCY_QUEUE_SIZE as f64 + 1_f64); /// Pool task frequency variable. -/// Holds scheduled tasks onto the thread pool for the calculation window. +/// Holds scheduled tasks onto the thread pool for the calculation time window. static FREQUENCY: AtomicU64 = AtomicU64::new(0); /// Possible max threads (without OS contract). From 6664f4b1ae9dbf9905506a42f3ca291e9017aa17 Mon Sep 17 00:00:00 2001 From: Mahmut Bulut Date: Fri, 30 Aug 2019 22:01:51 +0200 Subject: [PATCH 18/20] Removal of Arc --- src/task/blocking.rs | 33 +++++++++++---------------------- tests/thread_pool.rs | 6 ------ 2 files changed, 11 insertions(+), 28 deletions(-) diff --git a/src/task/blocking.rs b/src/task/blocking.rs index e910d41e6..cbfb20c01 100644 --- a/src/task/blocking.rs +++ b/src/task/blocking.rs @@ -65,7 +65,7 @@ use crate::future::Future; use crate::io::ErrorKind; use crate::task::{Context, Poll}; use crate::utils::abort_on_panic; -use std::sync::{Arc, Mutex}; +use std::sync::Mutex; /// Low watermark value, defines the bare minimum of the pool. /// Spawns initial thread set. @@ -135,12 +135,12 @@ lazy_static! { }; /// Sliding window for pool task frequency calculation - static ref FREQ_QUEUE: Arc>> = { - Arc::new(Mutex::new(VecDeque::with_capacity(FREQUENCY_QUEUE_SIZE + 1))) + static ref FREQ_QUEUE: Mutex> = { + Mutex::new(VecDeque::with_capacity(FREQUENCY_QUEUE_SIZE + 1)) }; /// Dynamic pool thread count variable - static ref POOL_SIZE: Arc> = Arc::new(Mutex::new(LOW_WATERMARK)); + static ref POOL_SIZE: Mutex = Mutex::new(LOW_WATERMARK); } /// Exponentially Weighted Moving Average calculation @@ -180,9 +180,8 @@ fn calculate_ema(freq_queue: &VecDeque) -> f64 { /// It uses frequency based calculation to define work. Utilizing average processing rate. fn scale_pool() { // Fetch current frequency, it does matter that operations are ordered in this approach. - let current_frequency = FREQUENCY.load(Ordering::SeqCst); - let freq_queue_arc = FREQ_QUEUE.clone(); - let mut freq_queue = freq_queue_arc.lock().unwrap(); + let current_frequency = FREQUENCY.swap(0, Ordering::SeqCst); + let mut freq_queue = FREQ_QUEUE.lock().unwrap(); // Make it safe to start for calculations by adding initial frequency scale if freq_queue.len() == 0 { @@ -227,15 +226,13 @@ fn scale_pool() { // If we fall to this case, scheduler is congested by longhauling tasks. // For unblock the flow we should add up some threads to the pool, but not that many to // stagger the program's operation. - let scale = LOW_WATERMARK * current_frequency + 1; + let scale = ((current_frequency as f64).powf(LOW_WATERMARK as f64) + 1_f64) as u64; // Scale it up! (0..scale).for_each(|_| { create_blocking_thread(); }); } - - FREQUENCY.store(0, Ordering::Release); } /// Creates blocking thread to receive tasks @@ -245,8 +242,7 @@ fn create_blocking_thread() { // Check that thread is spawnable. // If it hits to the OS limits don't spawn it. { - let current_arc = POOL_SIZE.clone(); - let pool_size = *current_arc.lock().unwrap(); + let pool_size = *POOL_SIZE.lock().unwrap(); if pool_size >= MAX_THREADS.load(Ordering::SeqCst) { MAX_THREADS.store(10_000, Ordering::SeqCst); return; @@ -267,17 +263,11 @@ fn create_blocking_thread() { let wait_limit = Duration::from_millis(1000 + rand_sleep_ms); // Adjust the pool size counter before and after spawn - { - let current_arc = POOL_SIZE.clone(); - *current_arc.lock().unwrap() += 1; - } + *POOL_SIZE.lock().unwrap() += 1; while let Ok(task) = POOL.receiver.recv_timeout(wait_limit) { abort_on_panic(|| task.run()); } - { - let current_arc = POOL_SIZE.clone(); - *current_arc.lock().unwrap() -= 1; - } + *POOL_SIZE.lock().unwrap() -= 1; }) .map_err(|err| { match err.kind() { @@ -286,8 +276,7 @@ fn create_blocking_thread() { // Also, some systems have it(like macOS), and some don't(Linux). // This case expected not to happen. // But when happened this shouldn't throw a panic. - let current_arc = POOL_SIZE.clone(); - MAX_THREADS.store(*current_arc.lock().unwrap() - 1, Ordering::SeqCst); + MAX_THREADS.store(*POOL_SIZE.lock().unwrap() - 1, Ordering::SeqCst); } _ => eprintln!( "cannot start a dynamic thread driving blocking tasks: {}", diff --git a/tests/thread_pool.rs b/tests/thread_pool.rs index d3933b776..29a85c9f3 100644 --- a/tests/thread_pool.rs +++ b/tests/thread_pool.rs @@ -44,9 +44,7 @@ fn slow_join() { let elapsed = start.elapsed().as_millis() - thread_join_time_max as u128; println!("Slow task join. Monotonic exec time: {:?} ns", elapsed); - // Should be less than 25_000 ns // Previous implementation is around this threshold. - assert_eq!(elapsed < 25_000, true); } // Test for slow joins with task burst. @@ -89,9 +87,7 @@ fn slow_join_interrupted() { let elapsed = start.elapsed().as_millis() - thread_join_time_max as u128; println!("Slow task join. Monotonic exec time: {:?} ns", elapsed); - // Should be less than 25_000 ns // Previous implementation is around this threshold. - assert_eq!(elapsed < 25_000, true); } // This test is expensive but it proves that longhauling tasks are working in adaptive thread pool. @@ -135,7 +131,5 @@ fn longhauling_task_join() { elapsed ); - // Should be less than 200_000 ns // Previous implementation will panic when this test is running. - assert_eq!(elapsed < 200_000, true); } From e934a5e23786814695c706f8aa8eb999aba2b2ab Mon Sep 17 00:00:00 2001 From: Mahmut Bulut Date: Fri, 30 Aug 2019 22:20:29 +0200 Subject: [PATCH 19/20] Safe arithmetic ops --- src/task/blocking.rs | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/src/task/blocking.rs b/src/task/blocking.rs index cbfb20c01..5b566b54e 100644 --- a/src/task/blocking.rs +++ b/src/task/blocking.rs @@ -136,7 +136,7 @@ lazy_static! { /// Sliding window for pool task frequency calculation static ref FREQ_QUEUE: Mutex> = { - Mutex::new(VecDeque::with_capacity(FREQUENCY_QUEUE_SIZE + 1)) + Mutex::new(VecDeque::with_capacity(FREQUENCY_QUEUE_SIZE.saturating_add(1))) }; /// Dynamic pool thread count variable @@ -196,7 +196,7 @@ fn scale_pool() { // Add seen frequency data to the frequency histogram. freq_queue.push_back(frequency); - if freq_queue.len() == FREQUENCY_QUEUE_SIZE + 1 { + if freq_queue.len() == FREQUENCY_QUEUE_SIZE.saturating_add(1) { freq_queue.pop_front(); } @@ -255,12 +255,14 @@ fn create_blocking_thread() { // background noise. // // Generate a simple random number of milliseconds - let rand_sleep_ms = u64::from(random(10_000)); + let rand_sleep_ms = 1000_u64 + .checked_add(u64::from(random(10_000))) + .expect("shouldn't overflow"); let _ = thread::Builder::new() .name("async-blocking-driver-dynamic".to_string()) .spawn(move || { - let wait_limit = Duration::from_millis(1000 + rand_sleep_ms); + let wait_limit = Duration::from_millis(rand_sleep_ms); // Adjust the pool size counter before and after spawn *POOL_SIZE.lock().unwrap() += 1; @@ -276,7 +278,12 @@ fn create_blocking_thread() { // Also, some systems have it(like macOS), and some don't(Linux). // This case expected not to happen. // But when happened this shouldn't throw a panic. - MAX_THREADS.store(*POOL_SIZE.lock().unwrap() - 1, Ordering::SeqCst); + let guarded_count = POOL_SIZE + .lock() + .unwrap() + .checked_sub(1) + .expect("shouldn't underflow"); + MAX_THREADS.store(guarded_count, Ordering::SeqCst); } _ => eprintln!( "cannot start a dynamic thread driving blocking tasks: {}", From 405b081911cdace72a9da31684e34202fe6da0ea Mon Sep 17 00:00:00 2001 From: Mahmut Bulut Date: Mon, 2 Sep 2019 00:00:19 +0200 Subject: [PATCH 20/20] Threshold fixes for congestion case --- src/task/blocking.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/task/blocking.rs b/src/task/blocking.rs index 5b566b54e..a91a51188 100644 --- a/src/task/blocking.rs +++ b/src/task/blocking.rs @@ -213,7 +213,8 @@ fn scale_pool() { // "Scale by" amount can be seen as "how much load is coming". // "Scale" amount is "how many threads we should spawn". let scale_by: f64 = curr_ema_frequency - prev_ema_frequency; - let scale = ((LOW_WATERMARK as f64 * scale_by) + LOW_WATERMARK as f64) as u64; + let scale = num_cpus::get() + .min(((LOW_WATERMARK as f64 * scale_by) + LOW_WATERMARK as f64) as usize); // It is time to scale the pool! (0..scale).for_each(|_| { @@ -226,10 +227,7 @@ fn scale_pool() { // If we fall to this case, scheduler is congested by longhauling tasks. // For unblock the flow we should add up some threads to the pool, but not that many to // stagger the program's operation. - let scale = ((current_frequency as f64).powf(LOW_WATERMARK as f64) + 1_f64) as u64; - - // Scale it up! - (0..scale).for_each(|_| { + (0..LOW_WATERMARK).for_each(|_| { create_blocking_thread(); }); }