From 78167670b88276a7663f520f660c22f4e2398882 Mon Sep 17 00:00:00 2001 From: Alex Snaps Date: Tue, 2 May 2023 16:35:27 -0400 Subject: [PATCH] Be optimistic wrt interleaved writes --- limitador/src/storage/disk/expiring_value.rs | 53 ++++++++++++------- limitador/src/storage/disk/rocksdb_storage.rs | 35 +++++++----- 2 files changed, 55 insertions(+), 33 deletions(-) diff --git a/limitador/src/storage/disk/expiring_value.rs b/limitador/src/storage/disk/expiring_value.rs index 30183b8e..cdfd5d5e 100644 --- a/limitador/src/storage/disk/expiring_value.rs +++ b/limitador/src/storage/disk/expiring_value.rs @@ -2,7 +2,7 @@ use crate::storage::StorageErr; use std::array::TryFromSliceError; use std::time::{Duration, SystemTime, UNIX_EPOCH}; -#[derive(Debug)] +#[derive(Clone, Debug)] pub(crate) struct ExpiringValue { value: i64, expiry: SystemTime, @@ -24,12 +24,25 @@ impl ExpiringValue { self.value_at(SystemTime::now()) } - pub fn merge(&mut self, other: ExpiringValue, now: SystemTime) { + pub fn update(self, delta: i64, ttl: u64, now: SystemTime) -> Self { + let expiry = if self.expiry <= now { + now + Duration::from_secs(ttl) + } else { + self.expiry + }; + + let value = self.value_at(now) + delta; + Self { value, expiry } + } + + pub fn merge(self, other: ExpiringValue, now: SystemTime) -> Self { if self.expiry > now { - self.value += other.value; + ExpiringValue { + value: self.value + other.value, + expiry: self.expiry, + } } else { - self.value = other.value; - self.expiry = other.expiry; + other } } @@ -113,21 +126,21 @@ mod tests { assert_eq!(val.value_at(now), 0); } - // #[test] - // fn updates_when_valid() { - // let now = SystemTime::now(); - // let val = ExpiringValue::new(42, now + Duration::from_secs(1)).update(3, 10); - // assert_eq!(val.value_at(now - Duration::from_secs(1)), 45); - // } - // - // #[test] - // fn updates_when_expired() { - // let now = SystemTime::now(); - // let val = ExpiringValue::new(42, now); - // assert_eq!(val.ttl(), Duration::ZERO); - // let val = val.update(3, 10); - // assert_eq!(val.value_at(now - Duration::from_secs(1)), 3); - // } + #[test] + fn updates_when_valid() { + let now = SystemTime::now(); + let val = ExpiringValue::new(42, now + Duration::from_secs(1)).update(3, 10, now); + assert_eq!(val.value_at(now - Duration::from_secs(1)), 45); + } + + #[test] + fn updates_when_expired() { + let now = SystemTime::now(); + let val = ExpiringValue::new(42, now); + assert_eq!(val.ttl(), Duration::ZERO); + let val = val.update(3, 10, now); + assert_eq!(val.value_at(now - Duration::from_secs(1)), 3); + } #[test] fn from_into_vec() { diff --git a/limitador/src/storage/disk/rocksdb_storage.rs b/limitador/src/storage/disk/rocksdb_storage.rs index 51a1a951..b4b18124 100644 --- a/limitador/src/storage/disk/rocksdb_storage.rs +++ b/limitador/src/storage/disk/rocksdb_storage.rs @@ -125,14 +125,16 @@ impl RocksDbStorage { } opts.set_merge_operator_associative("ExpiringValueMerge", |_key, start, operands| { let now = SystemTime::now(); - let mut start: ExpiringValue = start + let mut value: ExpiringValue = start .map(|raw: &[u8]| raw.try_into().unwrap_or_default()) .unwrap_or_default(); for op in operands { - let new: ExpiringValue = op.try_into().expect("This can't fail!"); - start.merge(new, now); + // ignore (corrupted?) values pending merges + if let Ok(pending) = ExpiringValue::try_from(op) { + value = value.merge(pending, now); + } } - Some(Vec::from(start)) + Some(Vec::from(value)) }); opts.create_if_missing(true); let db = DB::open(&opts, path).unwrap(); @@ -145,15 +147,22 @@ impl RocksDbStorage { counter: &Counter, delta: i64, ) -> Result { - let expiring_value = ExpiringValue::new( - delta, - SystemTime::now() + Duration::from_secs(counter.limit().seconds()), - ); - self.db - .merge(key, >>::into(expiring_value))?; - let result = self.db.get(key)?.expect("There is always a counter now"); - let slice: &[u8] = result.as_ref(); - Ok(slice.try_into()?) + let now = SystemTime::now(); + let value = match self.db.get(key)? { + None => ExpiringValue::default(), + Some(raw) => { + let slice: &[u8] = raw.as_ref(); + slice.try_into()? + } + }; + if value.value_at(now) + delta <= counter.max_value() { + let expiring_value = + ExpiringValue::new(delta, now + Duration::from_secs(counter.limit().seconds())); + self.db + .merge(key, >>::into(expiring_value))?; + return Ok(value.update(delta, counter.seconds(), now)); + } + Ok(value) } }