Skip to content

Commit

Permalink
Merge pull request #299 from Kuadrant/write-behind
Browse files Browse the repository at this point in the history
Support pending writes within CachedCounterValue
  • Loading branch information
didierofrivia authored Apr 30, 2024
2 parents b7c748a + 2b22301 commit 9f54a24
Show file tree
Hide file tree
Showing 4 changed files with 254 additions and 174 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions limitador/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ lenient_conditions = []

[dependencies]
moka = { version = "0.12", features = ["sync"] }
dashmap = "5.5.3"
getrandom = { version = "0.2", features = ["js"] }
serde = { version = "1", features = ["derive"] }
postcard = { version = "1.0.4", features = ["use-std"] }
Expand Down
229 changes: 212 additions & 17 deletions limitador/src/storage/redis/counters_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,147 @@ use crate::storage::redis::{
DEFAULT_MAX_CACHED_COUNTERS, DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC,
DEFAULT_TTL_RATIO_CACHED_COUNTERS,
};
use dashmap::mapref::entry::Entry;
use dashmap::DashMap;
use moka::sync::Cache;
use std::collections::HashMap;
use std::future::Future;
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::select;
use tokio::sync::Notify;
use tokio::time::interval;

pub struct CachedCounterValue {
value: AtomicExpiringValue,
initial_value: AtomicI64,
expiry: AtomicExpiryTime,
from_authority: AtomicBool,
}

pub struct Batcher {
updates: DashMap<Counter, Arc<CachedCounterValue>>,
notifier: Notify,
interval: Duration,
priority_flush: AtomicBool,
}

impl Batcher {
fn new(period: Duration) -> Self {
Self {
updates: Default::default(),
notifier: Default::default(),
interval: period,
priority_flush: AtomicBool::new(false),
}
}

pub fn is_empty(&self) -> bool {
self.updates.is_empty()
}

pub async fn consume<F, Fut, O>(&self, min: usize, consumer: F) -> O
where
F: FnOnce(HashMap<Counter, Arc<CachedCounterValue>>) -> Fut,
Fut: Future<Output = O>,
{
let mut interval = interval(self.interval);
let mut ready = self.updates.len() >= min;
loop {
if ready {
let mut batch = Vec::with_capacity(min);
for entry in &self.updates {
if entry.value().requires_fast_flush(&self.interval) {
batch.push(entry.key().clone());
if batch.len() == min {
break;
}
}
}
if let Some(remaining) = min.checked_sub(batch.len()) {
let take = self.updates.iter().take(remaining);
batch.append(&mut take.map(|e| e.key().clone()).collect());
}
let mut result = HashMap::new();
for counter in &batch {
let value = self.updates.get(counter).unwrap().clone();
result.insert(counter.clone(), value);
}
let result = consumer(result).await;
for counter in &batch {
self.updates
.remove_if(counter, |_, v| v.no_pending_writes());
}
return result;
} else {
ready = select! {
_ = self.notifier.notified() => {
self.updates.len() >= min ||
self.priority_flush
.compare_exchange(true, false, Ordering::Release, Ordering::Acquire)
.is_ok()
},
_ = interval.tick() => true,
}
}
}
}

pub fn add(&self, counter: Counter, value: Arc<CachedCounterValue>) {
let priority = value.requires_fast_flush(&self.interval);
match self.updates.entry(counter.clone()) {
Entry::Occupied(needs_merge) => {
let arc = needs_merge.get();
if !Arc::ptr_eq(arc, &value) {
arc.delta(&counter, value.pending_writes().unwrap());
}
}
Entry::Vacant(miss) => {
miss.insert_entry(value);
}
};
if priority {
self.priority_flush.store(true, Ordering::Release);
}
self.notifier.notify_one();
}
}

impl Default for Batcher {
fn default() -> Self {
Self::new(Duration::from_millis(100))
}
}

pub struct CountersCache {
max_ttl_cached_counters: Duration,
pub ttl_ratio_cached_counters: u64,
cache: Cache<Counter, Arc<CachedCounterValue>>,
batcher: Batcher,
}

impl CachedCounterValue {
pub fn from(counter: &Counter, value: i64, ttl: Duration) -> Self {
pub fn from_authority(counter: &Counter, value: i64, ttl: Duration) -> Self {
let now = SystemTime::now();
Self {
value: AtomicExpiringValue::new(value, now + Duration::from_secs(counter.seconds())),
initial_value: AtomicI64::new(value),
expiry: AtomicExpiryTime::from_now(ttl),
from_authority: AtomicBool::new(true),
}
}

pub fn load_from_authority_asap(counter: &Counter, temp_value: i64) -> Self {
let now = SystemTime::now();
Self {
value: AtomicExpiringValue::new(
temp_value,
now + Duration::from_secs(counter.seconds()),
),
initial_value: AtomicI64::new(temp_value),
expiry: AtomicExpiryTime::from_now(Duration::from_secs(counter.seconds())),
from_authority: AtomicBool::new(false),
}
}

Expand All @@ -34,13 +154,58 @@ impl CachedCounterValue {

pub fn set_from_authority(&self, counter: &Counter, value: i64, expiry: Duration) {
let time_window = Duration::from_secs(counter.seconds());
self.initial_value.store(value, Ordering::SeqCst);
self.value.set(value, time_window);
self.expiry.update(expiry);
self.from_authority.store(true, Ordering::Release);
}

pub fn delta(&self, counter: &Counter, delta: i64) -> i64 {
self.value
.update(delta, counter.seconds(), SystemTime::now())
let value = self
.value
.update(delta, counter.seconds(), SystemTime::now());
if value == delta {
// new window, invalidate initial value
self.initial_value.store(0, Ordering::SeqCst);
}
value
}

pub fn pending_writes(&self) -> Result<i64, ()> {
let start = self.initial_value.load(Ordering::SeqCst);
let value = self.value.value_at(SystemTime::now());
let offset = if start == 0 {
value
} else {
let writes = value - start;
if writes > 0 {
writes
} else {
value
}
};
match self
.initial_value
.compare_exchange(start, value, Ordering::SeqCst, Ordering::SeqCst)
{
Ok(_) => Ok(offset),
Err(newer) => {
if newer == 0 {
// We got expired in the meantime, this fresh value can wait the next iteration
Ok(0)
} else {
// Concurrent call to this method?
// We could support that with a CAS loop in the future if needed
Err(())
}
}
}
}

fn no_pending_writes(&self) -> bool {
let start = self.initial_value.load(Ordering::SeqCst);
let value = self.value.value_at(SystemTime::now());
value - start == 0
}

pub fn hits(&self, _: &Counter) -> i64 {
Expand All @@ -58,6 +223,10 @@ impl CachedCounterValue {
pub fn to_next_window(&self) -> Duration {
self.value.ttl()
}

pub fn requires_fast_flush(&self, within: &Duration) -> bool {
self.from_authority.load(Ordering::Acquire) || &self.value.ttl() <= within
}
}

pub struct CountersCacheBuilder {
Expand Down Expand Up @@ -90,18 +259,31 @@ impl CountersCacheBuilder {
self
}

pub fn build(&self) -> CountersCache {
pub fn build(&self, period: Duration) -> CountersCache {
CountersCache {
max_ttl_cached_counters: self.max_ttl_cached_counters,
ttl_ratio_cached_counters: self.ttl_ratio_cached_counters,
cache: Cache::new(self.max_cached_counters as u64),
batcher: Batcher::new(period),
}
}
}

impl CountersCache {
pub fn get(&self, counter: &Counter) -> Option<Arc<CachedCounterValue>> {
self.cache.get(counter)
let option = self.cache.get(counter);
if option.is_none() {
let from_queue = self.batcher.updates.get(counter);
if let Some(entry) = from_queue {
self.cache.insert(counter.clone(), entry.value().clone());
return Some(entry.value().clone());
}
}
option
}

pub fn batcher(&self) -> &Batcher {
&self.batcher
}

pub fn insert(
Expand All @@ -122,25 +304,38 @@ impl CountersCache {
if let Some(ttl) = cache_ttl.checked_sub(ttl_margin) {
if ttl > Duration::ZERO {
let previous = self.cache.get_with(counter.clone(), || {
Arc::new(CachedCounterValue::from(&counter, counter_val, cache_ttl))
if let Some(entry) = self.batcher.updates.get(&counter) {
entry.value().clone()
} else {
Arc::new(CachedCounterValue::from_authority(
&counter,
counter_val,
ttl,
))
}
});
if previous.expired_at(now) || previous.value.value() < counter_val {
previous.set_from_authority(&counter, counter_val, cache_ttl);
previous.set_from_authority(&counter, counter_val, ttl);
}
return previous;
}
}
Arc::new(CachedCounterValue::from(
Arc::new(CachedCounterValue::load_from_authority_asap(
&counter,
counter_val,
Duration::ZERO,
))
}

pub fn increase_by(&self, counter: &Counter, delta: i64) {
if let Some(val) = self.cache.get(counter) {
val.delta(counter, delta);
};
let val = self.cache.get_with_by_ref(counter, || {
if let Some(entry) = self.batcher.updates.get(counter) {
entry.value().clone()
} else {
Arc::new(CachedCounterValue::load_from_authority_asap(counter, 0))
}
});
val.delta(counter, delta);
self.batcher.add(counter.clone(), val.clone());
}

fn ttl_from_redis_ttl(
Expand Down Expand Up @@ -209,7 +404,7 @@ mod tests {
values,
);

let cache = CountersCacheBuilder::new().build();
let cache = CountersCacheBuilder::new().build(Duration::default());
cache.insert(
counter.clone(),
Some(10),
Expand All @@ -236,7 +431,7 @@ mod tests {
values,
);

let cache = CountersCacheBuilder::new().build();
let cache = CountersCacheBuilder::new().build(Duration::default());

assert!(cache.get(&counter).is_none());
}
Expand All @@ -258,7 +453,7 @@ mod tests {
values,
);

let cache = CountersCacheBuilder::new().build();
let cache = CountersCacheBuilder::new().build(Duration::default());
cache.insert(
counter.clone(),
Some(current_value),
Expand Down Expand Up @@ -289,7 +484,7 @@ mod tests {
values,
);

let cache = CountersCacheBuilder::new().build();
let cache = CountersCacheBuilder::new().build(Duration::default());
cache.insert(
counter.clone(),
None,
Expand Down Expand Up @@ -318,7 +513,7 @@ mod tests {
values,
);

let cache = CountersCacheBuilder::new().build();
let cache = CountersCacheBuilder::new().build(Duration::default());
cache.insert(
counter.clone(),
Some(current_val),
Expand Down
Loading

0 comments on commit 9f54a24

Please sign in to comment.