Skip to content

Commit

Permalink
Merge pull request #303 from Kuadrant/more_write_behind
Browse files Browse the repository at this point in the history
More write behind
  • Loading branch information
alexsnaps authored Apr 30, 2024
2 parents 9f54a24 + c5212cf commit 31ee24c
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 130 deletions.
253 changes: 126 additions & 127 deletions limitador/src/storage/redis/counters_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::select;
use tokio::sync::Notify;
use tokio::time::interval;

pub struct CachedCounterValue {
value: AtomicExpiringValue,
Expand All @@ -23,107 +22,6 @@ pub struct CachedCounterValue {
from_authority: AtomicBool,
}

pub struct Batcher {
updates: DashMap<Counter, Arc<CachedCounterValue>>,
notifier: Notify,
interval: Duration,
priority_flush: AtomicBool,
}

impl Batcher {
fn new(period: Duration) -> Self {
Self {
updates: Default::default(),
notifier: Default::default(),
interval: period,
priority_flush: AtomicBool::new(false),
}
}

pub fn is_empty(&self) -> bool {
self.updates.is_empty()
}

pub async fn consume<F, Fut, O>(&self, min: usize, consumer: F) -> O
where
F: FnOnce(HashMap<Counter, Arc<CachedCounterValue>>) -> Fut,
Fut: Future<Output = O>,
{
let mut interval = interval(self.interval);
let mut ready = self.updates.len() >= min;
loop {
if ready {
let mut batch = Vec::with_capacity(min);
for entry in &self.updates {
if entry.value().requires_fast_flush(&self.interval) {
batch.push(entry.key().clone());
if batch.len() == min {
break;
}
}
}
if let Some(remaining) = min.checked_sub(batch.len()) {
let take = self.updates.iter().take(remaining);
batch.append(&mut take.map(|e| e.key().clone()).collect());
}
let mut result = HashMap::new();
for counter in &batch {
let value = self.updates.get(counter).unwrap().clone();
result.insert(counter.clone(), value);
}
let result = consumer(result).await;
for counter in &batch {
self.updates
.remove_if(counter, |_, v| v.no_pending_writes());
}
return result;
} else {
ready = select! {
_ = self.notifier.notified() => {
self.updates.len() >= min ||
self.priority_flush
.compare_exchange(true, false, Ordering::Release, Ordering::Acquire)
.is_ok()
},
_ = interval.tick() => true,
}
}
}
}

pub fn add(&self, counter: Counter, value: Arc<CachedCounterValue>) {
let priority = value.requires_fast_flush(&self.interval);
match self.updates.entry(counter.clone()) {
Entry::Occupied(needs_merge) => {
let arc = needs_merge.get();
if !Arc::ptr_eq(arc, &value) {
arc.delta(&counter, value.pending_writes().unwrap());
}
}
Entry::Vacant(miss) => {
miss.insert_entry(value);
}
};
if priority {
self.priority_flush.store(true, Ordering::Release);
}
self.notifier.notify_one();
}
}

impl Default for Batcher {
fn default() -> Self {
Self::new(Duration::from_millis(100))
}
}

pub struct CountersCache {
max_ttl_cached_counters: Duration,
pub ttl_ratio_cached_counters: u64,
cache: Cache<Counter, Arc<CachedCounterValue>>,
batcher: Batcher,
}

impl CachedCounterValue {
pub fn from_authority(counter: &Counter, value: i64, ttl: Duration) -> Self {
let now = SystemTime::now();
Expand Down Expand Up @@ -229,46 +127,107 @@ impl CachedCounterValue {
}
}

pub struct CountersCacheBuilder {
max_cached_counters: usize,
max_ttl_cached_counters: Duration,
ttl_ratio_cached_counters: u64,
pub struct Batcher {
updates: DashMap<Counter, Arc<CachedCounterValue>>,
notifier: Notify,
interval: Duration,
priority_flush: AtomicBool,
}

impl CountersCacheBuilder {
pub fn new() -> Self {
impl Batcher {
fn new(period: Duration) -> Self {
Self {
max_cached_counters: DEFAULT_MAX_CACHED_COUNTERS,
max_ttl_cached_counters: Duration::from_secs(DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC),
ttl_ratio_cached_counters: DEFAULT_TTL_RATIO_CACHED_COUNTERS,
updates: Default::default(),
notifier: Default::default(),
interval: period,
priority_flush: AtomicBool::new(false),
}
}

pub fn max_cached_counters(mut self, max_cached_counters: usize) -> Self {
self.max_cached_counters = max_cached_counters;
self
pub fn add(&self, counter: Counter, value: Arc<CachedCounterValue>) {
let priority = value.requires_fast_flush(&self.interval);
match self.updates.entry(counter.clone()) {
Entry::Occupied(needs_merge) => {
let arc = needs_merge.get();
if !Arc::ptr_eq(arc, &value) {
arc.delta(&counter, value.pending_writes().unwrap());
}
}
Entry::Vacant(miss) => {
miss.insert_entry(value);
}
};
if priority {
self.priority_flush.store(true, Ordering::Release);
}
self.notifier.notify_one();
}

pub fn max_ttl_cached_counter(mut self, max_ttl_cached_counter: Duration) -> Self {
self.max_ttl_cached_counters = max_ttl_cached_counter;
self
pub async fn consume<F, Fut, O>(&self, max: usize, consumer: F) -> O
where
F: FnOnce(HashMap<Counter, Arc<CachedCounterValue>>) -> Fut,
Fut: Future<Output = O>,
{
let mut ready = self.batch_ready(max);
loop {
if ready {
let mut batch = Vec::with_capacity(max);
batch.extend(
self.updates
.iter()
.filter(|entry| entry.value().requires_fast_flush(&self.interval))
.take(max)
.map(|e| e.key().clone()),
);
if let Some(remaining) = max.checked_sub(batch.len()) {
batch.extend(self.updates.iter().take(remaining).map(|e| e.key().clone()));
}
let mut result = HashMap::new();
for counter in &batch {
let value = self.updates.get(counter).unwrap().clone();
result.insert(counter.clone(), value);
}
let result = consumer(result).await;
batch.iter().for_each(|counter| {
self.updates
.remove_if(counter, |_, v| v.no_pending_writes());
});
return result;
} else {
ready = select! {
_ = self.notifier.notified() => self.batch_ready(max),
_ = tokio::time::sleep(self.interval) => true,
}
}
}
}

pub fn ttl_ratio_cached_counter(mut self, ttl_ratio_cached_counter: u64) -> Self {
self.ttl_ratio_cached_counters = ttl_ratio_cached_counter;
self
pub fn is_empty(&self) -> bool {
self.updates.is_empty()
}

pub fn build(&self, period: Duration) -> CountersCache {
CountersCache {
max_ttl_cached_counters: self.max_ttl_cached_counters,
ttl_ratio_cached_counters: self.ttl_ratio_cached_counters,
cache: Cache::new(self.max_cached_counters as u64),
batcher: Batcher::new(period),
}
fn batch_ready(&self, size: usize) -> bool {
self.updates.len() >= size
|| self
.priority_flush
.compare_exchange(true, false, Ordering::Release, Ordering::Acquire)
.is_ok()
}
}

impl Default for Batcher {
fn default() -> Self {
Self::new(Duration::from_millis(100))
}
}

pub struct CountersCache {
max_ttl_cached_counters: Duration,
pub ttl_ratio_cached_counters: u64,
cache: Cache<Counter, Arc<CachedCounterValue>>,
batcher: Batcher,
}

impl CountersCache {
pub fn get(&self, counter: &Counter) -> Option<Arc<CachedCounterValue>> {
let option = self.cache.get(counter);
Expand Down Expand Up @@ -383,6 +342,46 @@ impl CountersCache {
}
}

pub struct CountersCacheBuilder {
max_cached_counters: usize,
max_ttl_cached_counters: Duration,
ttl_ratio_cached_counters: u64,
}

impl CountersCacheBuilder {
pub fn new() -> Self {
Self {
max_cached_counters: DEFAULT_MAX_CACHED_COUNTERS,
max_ttl_cached_counters: Duration::from_secs(DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC),
ttl_ratio_cached_counters: DEFAULT_TTL_RATIO_CACHED_COUNTERS,
}
}

pub fn max_cached_counters(mut self, max_cached_counters: usize) -> Self {
self.max_cached_counters = max_cached_counters;
self
}

pub fn max_ttl_cached_counter(mut self, max_ttl_cached_counter: Duration) -> Self {
self.max_ttl_cached_counters = max_ttl_cached_counter;
self
}

pub fn ttl_ratio_cached_counter(mut self, ttl_ratio_cached_counter: u64) -> Self {
self.ttl_ratio_cached_counters = ttl_ratio_cached_counter;
self
}

pub fn build(&self, period: Duration) -> CountersCache {
CountersCache {
max_ttl_cached_counters: self.max_ttl_cached_counters,
ttl_ratio_cached_counters: self.ttl_ratio_cached_counters,
cache: Cache::new(self.max_cached_counters as u64),
batcher: Batcher::new(period),
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
8 changes: 6 additions & 2 deletions limitador/src/storage/redis/redis_cached.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,11 @@ async fn update_counters<C: ConnectionLike>(
let redis_script = redis::Script::new(BATCH_UPDATE_COUNTERS);
let mut script_invocation = redis_script.prepare_invoke();

let mut res: Vec<(Counter, i64, i64)> = Vec::new();
let mut res: Vec<(Counter, i64, i64)> = Vec::with_capacity(counters_and_deltas.len());
if counters_and_deltas.is_empty() {
return Ok(res);
}

for (counter, delta) in counters_and_deltas {
let delta = delta.pending_writes().expect("State machine is wrong!");
if delta > 0 {
Expand Down Expand Up @@ -339,7 +343,7 @@ async fn flush_batcher_and_update_counters<C: ConnectionLike>(
} else {
let updated_counters = cached_counters
.batcher()
.consume(1, |counters| update_counters(&mut redis_conn, counters))
.consume(100, |counters| update_counters(&mut redis_conn, counters))
.await
.or_else(|err| {
if err.is_transient() {
Expand Down
2 changes: 1 addition & 1 deletion limitador/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ mod test {
}

// We wait for the flushing period to pass so the counters are flushed in the cached storage
tokio::time::sleep(Duration::from_millis(3)).await;
tokio::time::sleep(Duration::from_millis(4)).await;

assert!(rate_limiter
.is_rate_limited(namespace, &get_values, 1)
Expand Down

0 comments on commit 31ee24c

Please sign in to comment.