Skip to content

Commit

Permalink
Be optimistic wrt interleaved writes
Browse files Browse the repository at this point in the history
  • Loading branch information
alexsnaps committed May 2, 2023
1 parent 782ee3c commit 7816767
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 33 deletions.
53 changes: 33 additions & 20 deletions limitador/src/storage/disk/expiring_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::storage::StorageErr;
use std::array::TryFromSliceError;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

#[derive(Debug)]
#[derive(Clone, Debug)]
pub(crate) struct ExpiringValue {
value: i64,
expiry: SystemTime,
Expand All @@ -24,12 +24,25 @@ impl ExpiringValue {
self.value_at(SystemTime::now())
}

pub fn merge(&mut self, other: ExpiringValue, now: SystemTime) {
pub fn update(self, delta: i64, ttl: u64, now: SystemTime) -> Self {
let expiry = if self.expiry <= now {
now + Duration::from_secs(ttl)
} else {
self.expiry
};

let value = self.value_at(now) + delta;
Self { value, expiry }
}

pub fn merge(self, other: ExpiringValue, now: SystemTime) -> Self {
if self.expiry > now {
self.value += other.value;
ExpiringValue {
value: self.value + other.value,
expiry: self.expiry,
}
} else {
self.value = other.value;
self.expiry = other.expiry;
other
}
}

Expand Down Expand Up @@ -113,21 +126,21 @@ mod tests {
assert_eq!(val.value_at(now), 0);
}

// #[test]
// fn updates_when_valid() {
// let now = SystemTime::now();
// let val = ExpiringValue::new(42, now + Duration::from_secs(1)).update(3, 10);
// assert_eq!(val.value_at(now - Duration::from_secs(1)), 45);
// }
//
// #[test]
// fn updates_when_expired() {
// let now = SystemTime::now();
// let val = ExpiringValue::new(42, now);
// assert_eq!(val.ttl(), Duration::ZERO);
// let val = val.update(3, 10);
// assert_eq!(val.value_at(now - Duration::from_secs(1)), 3);
// }
#[test]
fn updates_when_valid() {
let now = SystemTime::now();
let val = ExpiringValue::new(42, now + Duration::from_secs(1)).update(3, 10, now);
assert_eq!(val.value_at(now - Duration::from_secs(1)), 45);
}

#[test]
fn updates_when_expired() {
let now = SystemTime::now();
let val = ExpiringValue::new(42, now);
assert_eq!(val.ttl(), Duration::ZERO);
let val = val.update(3, 10, now);
assert_eq!(val.value_at(now - Duration::from_secs(1)), 3);
}

#[test]
fn from_into_vec() {
Expand Down
35 changes: 22 additions & 13 deletions limitador/src/storage/disk/rocksdb_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,16 @@ impl RocksDbStorage {
}
opts.set_merge_operator_associative("ExpiringValueMerge", |_key, start, operands| {
let now = SystemTime::now();
let mut start: ExpiringValue = start
let mut value: ExpiringValue = start
.map(|raw: &[u8]| raw.try_into().unwrap_or_default())
.unwrap_or_default();
for op in operands {
let new: ExpiringValue = op.try_into().expect("This can't fail!");
start.merge(new, now);
// ignore (corrupted?) values pending merges
if let Ok(pending) = ExpiringValue::try_from(op) {
value = value.merge(pending, now);
}
}
Some(Vec::from(start))
Some(Vec::from(value))
});
opts.create_if_missing(true);
let db = DB::open(&opts, path).unwrap();
Expand All @@ -145,15 +147,22 @@ impl RocksDbStorage {
counter: &Counter,
delta: i64,
) -> Result<ExpiringValue, StorageErr> {
let expiring_value = ExpiringValue::new(
delta,
SystemTime::now() + Duration::from_secs(counter.limit().seconds()),
);
self.db
.merge(key, <ExpiringValue as Into<Vec<u8>>>::into(expiring_value))?;
let result = self.db.get(key)?.expect("There is always a counter now");
let slice: &[u8] = result.as_ref();
Ok(slice.try_into()?)
let now = SystemTime::now();
let value = match self.db.get(key)? {
None => ExpiringValue::default(),
Some(raw) => {
let slice: &[u8] = raw.as_ref();
slice.try_into()?
}
};
if value.value_at(now) + delta <= counter.max_value() {
let expiring_value =
ExpiringValue::new(delta, now + Duration::from_secs(counter.limit().seconds()));
self.db
.merge(key, <ExpiringValue as Into<Vec<u8>>>::into(expiring_value))?;
return Ok(value.update(delta, counter.seconds(), now));
}
Ok(value)
}
}

Expand Down

0 comments on commit 7816767

Please sign in to comment.