Skip to content

Commit

Permalink
Merge pull request #316 from Kuadrant/cached_delta_updates
Browse files Browse the repository at this point in the history
Cached delta updates
  • Loading branch information
alexsnaps authored May 8, 2024
2 parents 6073e60 + 5a3a2a0 commit 1a5a24d
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 118 deletions.
6 changes: 6 additions & 0 deletions limitador/src/storage/atomic_expiring_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ impl AtomicExpiringValue {
self.value_at(SystemTime::now())
}

#[allow(dead_code)]
pub fn add_and_set_expiry(&self, delta: i64, expiry: Duration) -> i64 {
self.expiry.update(expiry);
self.value.fetch_add(delta, Ordering::SeqCst) + delta
}

pub fn update(&self, delta: i64, ttl: u64, when: SystemTime) -> i64 {
if self.expiry.update_if_expired(ttl, when) {
self.value.store(delta, Ordering::SeqCst);
Expand Down
94 changes: 33 additions & 61 deletions limitador/src/storage/redis/counters_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,9 @@ impl CachedCounterValue {
self.expiry.expired_at(now)
}

pub fn set_from_authority(&self, counter: &Counter, value: i64, expiry: Duration) {
let time_window = Duration::from_secs(counter.seconds());
self.initial_value.store(value, Ordering::SeqCst);
self.value.set(value, time_window);
pub fn add_from_authority(&self, delta: i64, expiry: Duration) {
self.value.add_and_set_expiry(delta, expiry);
self.initial_value.fetch_add(delta, Ordering::SeqCst);
self.expiry.update(expiry);
self.from_authority.store(true, Ordering::Release);
}
Expand All @@ -73,6 +72,10 @@ impl CachedCounterValue {
}

pub fn pending_writes(&self) -> Result<i64, ()> {
self.pending_writes_and_value().map(|(writes, _)| writes)
}

pub fn pending_writes_and_value(&self) -> Result<(i64, i64), ()> {
let start = self.initial_value.load(Ordering::SeqCst);
let value = self.value.value_at(SystemTime::now());
let offset = if start == 0 {
Expand All @@ -91,11 +94,11 @@ impl CachedCounterValue {
.initial_value
.compare_exchange(start, value, Ordering::SeqCst, Ordering::SeqCst)
{
Ok(_) => Ok(offset),
Ok(_) => Ok((offset, value)),
Err(newer) => {
if newer == 0 {
// We got reset because of expiry, this fresh value can wait the next iteration
Ok(0)
Ok((0, 0))
} else {
// Concurrent call to this method?
// We could support that with a CAS loop in the future if needed
Expand Down Expand Up @@ -252,43 +255,41 @@ impl CountersCache {
&self.batcher
}

pub fn insert(
pub fn apply_remote_delta(
&self,
counter: Counter,
redis_val: Option<i64>,
redis_val: i64,
remote_deltas: i64,
redis_ttl_ms: i64,
ttl_margin: Duration,
now: SystemTime,
) -> Arc<CachedCounterValue> {
let counter_val = redis_val.unwrap_or(0);
let cache_ttl = self.ttl_from_redis_ttl(
redis_ttl_ms,
counter.seconds(),
counter_val,
redis_val,
counter.max_value(),
);
if let Some(ttl) = cache_ttl.checked_sub(ttl_margin) {
if ttl > Duration::ZERO {
let previous = self.cache.get_with(counter.clone(), || {
let mut from_cache = true;
let cached = self.cache.get_with(counter.clone(), || {
from_cache = false;
if let Some(entry) = self.batcher.updates.get(&counter) {
entry.value().clone()
let cached_value = entry.value();
cached_value.add_from_authority(remote_deltas, ttl);
cached_value.clone()
} else {
Arc::new(CachedCounterValue::from_authority(
&counter,
counter_val,
ttl,
))
Arc::new(CachedCounterValue::from_authority(&counter, redis_val, ttl))
}
});
if previous.expired_at(now) || previous.value.value() < counter_val {
previous.set_from_authority(&counter, counter_val, ttl);
if from_cache {
cached.add_from_authority(remote_deltas, ttl);
}
return previous;
return cached;
}
}
Arc::new(CachedCounterValue::load_from_authority_asap(
&counter,
counter_val,
&counter, redis_val,
))
}

Expand Down Expand Up @@ -430,14 +431,14 @@ mod tests {
}

#[test]
fn setting_from_auth_resets_pending_writes() {
fn adding_from_auth_not_affecting_pending_writes() {
let counter = test_counter(10, None);
let value = CachedCounterValue::from_authority(&counter, 0, Duration::from_secs(1));
value.delta(&counter, 5);
assert!(value.no_pending_writes().not());
value.set_from_authority(&counter, 6, Duration::from_secs(1));
assert!(value.no_pending_writes());
assert_eq!(value.pending_writes(), Ok(0));
value.add_from_authority(6, Duration::from_secs(1));
assert!(value.no_pending_writes().not());
assert_eq!(value.pending_writes(), Ok(5));
}

#[test]
Expand Down Expand Up @@ -623,13 +624,7 @@ mod tests {
let counter = test_counter(10, None);

let cache = CountersCacheBuilder::new().build(Duration::default());
cache.insert(
counter.clone(),
Some(10),
10,
Duration::from_secs(0),
SystemTime::now(),
);
cache.apply_remote_delta(counter.clone(), 10, 0, 10, Duration::from_secs(0));

assert!(cache.get(&counter).is_some());
}
Expand All @@ -650,12 +645,12 @@ mod tests {
let counter = test_counter(max_val, None);

let cache = CountersCacheBuilder::new().build(Duration::default());
cache.insert(
cache.apply_remote_delta(
counter.clone(),
Some(current_value),
current_value,
0,
10,
Duration::from_secs(0),
SystemTime::now(),
);

assert_eq!(
Expand All @@ -664,37 +659,14 @@ mod tests {
);
}

#[test]
fn insert_saves_zero_when_redis_val_is_none() {
let max_val = 10;
let counter = test_counter(max_val, None);

let cache = CountersCacheBuilder::new().build(Duration::default());
cache.insert(
counter.clone(),
None,
10,
Duration::from_secs(0),
SystemTime::now(),
);

assert_eq!(cache.get(&counter).map(|e| e.hits(&counter)).unwrap(), 0);
}

#[test]
fn increase_by() {
let current_val = 10;
let increase_by = 8;
let counter = test_counter(current_val, None);

let cache = CountersCacheBuilder::new().build(Duration::default());
cache.insert(
counter.clone(),
Some(current_val),
10,
Duration::from_secs(0),
SystemTime::now(),
);
cache.apply_remote_delta(counter.clone(), current_val, 0, 10, Duration::from_secs(0));
cache.increase_by(&counter, increase_by);

assert_eq!(
Expand Down
118 changes: 61 additions & 57 deletions limitador/src/storage/redis/redis_cached.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,43 +291,48 @@ impl CachedRedisStorageBuilder {
async fn update_counters<C: ConnectionLike>(
redis_conn: &mut C,
counters_and_deltas: HashMap<Counter, Arc<CachedCounterValue>>,
) -> Result<Vec<(Counter, i64, i64)>, StorageErr> {
) -> Result<Vec<(Counter, i64, i64, i64)>, StorageErr> {
let redis_script = redis::Script::new(BATCH_UPDATE_COUNTERS);
let mut script_invocation = redis_script.prepare_invoke();

let mut res: Vec<(Counter, i64, i64)> = Vec::with_capacity(counters_and_deltas.len());
if counters_and_deltas.is_empty() {
return Ok(res);
}

for (counter, value) in counters_and_deltas {
let delta = value.pending_writes().expect("State machine is wrong!");
if delta > 0 {
script_invocation.key(key_for_counter(&counter));
script_invocation.key(key_for_counters_of_limit(counter.limit()));
script_invocation.arg(counter.seconds());
script_invocation.arg(delta);
// We need to store the counter in the actual order we are sending it to the script
res.push((counter, 0, 0));
let res = if counters_and_deltas.is_empty() {
Default::default()
} else {
let mut res: Vec<(Counter, i64, i64, i64)> = Vec::with_capacity(counters_and_deltas.len());

for (counter, value) in counters_and_deltas {
let (delta, last_value_from_redis) = value
.pending_writes_and_value()
.expect("State machine is wrong!");
if delta > 0 {
script_invocation.key(key_for_counter(&counter));
script_invocation.key(key_for_counters_of_limit(counter.limit()));
script_invocation.arg(counter.seconds());
script_invocation.arg(delta);
// We need to store the counter in the actual order we are sending it to the script
res.push((counter, 0, last_value_from_redis, 0));
}
}
}

let span = debug_span!("datastore");
// The redis crate is not working with tables, thus the response will be a Vec of counter values
let script_res: Vec<i64> = script_invocation
.invoke_async(redis_conn)
.instrument(span)
.await?;

// We need to update the values and ttls returned by redis
let counters_range = 0..res.len();
let script_res_range = (0..script_res.len()).step_by(2);

for (i, j) in counters_range.zip(script_res_range) {
let (_, val, ttl) = &mut res[i];
*val = script_res[j];
*ttl = script_res[j + 1];
}
let span = debug_span!("datastore");
// The redis crate is not working with tables, thus the response will be a Vec of counter values
let script_res: Vec<i64> = script_invocation
.invoke_async(redis_conn)
.instrument(span)
.await?;

// We need to update the values and ttls returned by redis
let counters_range = 0..res.len();
let script_res_range = (0..script_res.len()).step_by(2);

for (i, j) in counters_range.zip(script_res_range) {
let (_, val, delta, ttl) = &mut res[i];
*val = script_res[j];
*delta = script_res[j] - *delta;
*ttl = script_res[j + 1];
}
res
};

Ok(res)
}
Expand Down Expand Up @@ -359,15 +364,15 @@ async fn flush_batcher_and_update_counters<C: ConnectionLike>(

let time_start_update_counters = Instant::now();

for (counter, value, ttl) in updated_counters {
cached_counters.insert(
for (counter, new_value, remote_deltas, ttl) in updated_counters {
cached_counters.apply_remote_delta(
counter,
Option::from(value),
new_value,
remote_deltas,
ttl,
Duration::from_millis(
(Instant::now() - time_start_update_counters).as_millis() as u64
),
SystemTime::now(),
);
}
}
Expand All @@ -388,7 +393,7 @@ mod tests {
use std::collections::HashMap;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use std::time::Duration;

#[tokio::test]
async fn errs_on_bad_url() {
Expand All @@ -408,6 +413,10 @@ mod tests {

#[tokio::test]
async fn batch_update_counters() {
const NEW_VALUE_FROM_REDIS: i64 = 10;
const INITIAL_VALUE_FROM_REDIS: i64 = 1;
const LOCAL_INCREMENTS: i64 = 2;

let mut counters_and_deltas = HashMap::new();
let counter = Counter::new(
Limit::new(
Expand All @@ -422,13 +431,13 @@ mod tests {

let arc = Arc::new(CachedCounterValue::from_authority(
&counter,
1,
INITIAL_VALUE_FROM_REDIS,
Duration::from_secs(60),
));
arc.delta(&counter, 1);
arc.delta(&counter, LOCAL_INCREMENTS);
counters_and_deltas.insert(counter.clone(), arc);

let mock_response = Value::Bulk(vec![Value::Int(10), Value::Int(60)]);
let mock_response = Value::Bulk(vec![Value::Int(NEW_VALUE_FROM_REDIS), Value::Int(60)]);

let mut mock_client = MockRedisConnection::new(vec![MockCmd::new(
redis::cmd("EVALSHA")
Expand All @@ -437,21 +446,22 @@ mod tests {
.arg(key_for_counter(&counter))
.arg(key_for_counters_of_limit(counter.limit()))
.arg(60)
.arg(1),
.arg(LOCAL_INCREMENTS),
Ok(mock_response),
)]);

let result = update_counters(&mut mock_client, counters_and_deltas).await;

assert!(result.is_ok());
let mut result = update_counters(&mut mock_client, counters_and_deltas)
.await
.unwrap();

let (c, v, t) = result.unwrap()[0].clone();
let (c, new_value, remote_increments, new_ttl) = result.remove(0);
assert_eq!(key_for_counter(&counter), key_for_counter(&c));
assert_eq!(NEW_VALUE_FROM_REDIS, new_value);
assert_eq!(
"req.method == \"GET\"",
c.limit().conditions().iter().collect::<Vec<_>>()[0]
NEW_VALUE_FROM_REDIS - INITIAL_VALUE_FROM_REDIS - LOCAL_INCREMENTS,
remote_increments
);
assert_eq!(10, v);
assert_eq!(60, t);
assert_eq!(60, new_ttl);
}

#[tokio::test]
Expand Down Expand Up @@ -480,18 +490,12 @@ mod tests {
Ok(mock_response),
)]);

let cache = CountersCacheBuilder::new().build(Duration::from_millis(1));
let cache = CountersCacheBuilder::new().build(Duration::from_millis(10));
cache.batcher().add(
counter.clone(),
Arc::new(CachedCounterValue::load_from_authority_asap(&counter, 2)),
);
cache.insert(
counter.clone(),
Some(1),
10,
Duration::from_secs(0),
SystemTime::now(),
);

let cached_counters: Arc<CountersCache> = Arc::new(cache);
let partitioned = Arc::new(AtomicBool::new(false));

Expand Down

0 comments on commit 1a5a24d

Please sign in to comment.