Skip to content

Commit

Permalink
Merge pull request #288 from Kuadrant/cached-value
Browse files Browse the repository at this point in the history
Cached value
  • Loading branch information
alexsnaps authored Apr 10, 2024
2 parents ae41cd9 + 954c530 commit c672633
Show file tree
Hide file tree
Showing 6 changed files with 244 additions and 151 deletions.
16 changes: 0 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion limitador-server/examples/limits.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@
conditions:
- "req.method == 'POST'"
variables:
- user_id
- user_id
1 change: 0 additions & 1 deletion limitador/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ lenient_conditions = []
[dependencies]
moka = "0.11.2"
getrandom = { version = "0.2", features = ["js"] }
ttl_cache = "0.5"
serde = { version = "1", features = ["derive"] }
postcard = { version = "1.0.4", features = ["use-std"] }
serde_json = "1"
Expand Down
98 changes: 73 additions & 25 deletions limitador/src/storage/atomic_expiring_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,19 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
#[derive(Debug)]
pub(crate) struct AtomicExpiringValue {
value: AtomicI64,
expiry: AtomicU64, // in microseconds
expiry: AtomicExpiryTime,
}

impl AtomicExpiringValue {
pub fn new(value: i64, expiry: SystemTime) -> Self {
let expiry = Self::get_duration_micros(expiry);
Self {
value: AtomicI64::new(value),
expiry: AtomicU64::new(expiry),
expiry: AtomicExpiryTime::new(expiry),
}
}

pub fn value_at(&self, when: SystemTime) -> i64 {
let when = Self::get_duration_micros(when);
let expiry = self.expiry.load(Ordering::SeqCst);
if expiry <= when {
if self.expiry.expired_at(when) {
return 0;
}
self.value.load(Ordering::SeqCst)
Expand All @@ -31,44 +28,95 @@ impl AtomicExpiringValue {
}

pub fn update(&self, delta: i64, ttl: u64, when: SystemTime) -> i64 {
let ttl_micros = ttl * 1_000_000;
let when_micros = Self::get_duration_micros(when);

let expiry = self.expiry.load(Ordering::SeqCst);
if expiry <= when_micros {
let new_expiry = when_micros + ttl_micros;
if self
.expiry
.compare_exchange(expiry, new_expiry, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
self.value.store(delta, Ordering::SeqCst);
}
if self.expiry.update_if_expired(ttl, when) {
self.value.store(delta, Ordering::SeqCst);
return delta;
}
self.value.fetch_add(delta, Ordering::SeqCst) + delta
}

pub fn ttl(&self) -> Duration {
self.expiry.duration()
}

#[allow(dead_code)]
pub fn set(&self, value: i64, ttl: Duration) {
self.expiry.update(ttl);
self.value.store(value, Ordering::SeqCst);
}
}

#[derive(Debug)]
pub struct AtomicExpiryTime {
expiry: AtomicU64, // in microseconds
}

impl AtomicExpiryTime {
pub fn new(when: SystemTime) -> Self {
let expiry = Self::since_epoch(when);
Self {
expiry: AtomicU64::new(expiry),
}
}

#[allow(dead_code)]
pub fn from_now(ttl: Duration) -> Self {
Self::new(SystemTime::now() + ttl)
}

fn since_epoch(when: SystemTime) -> u64 {
when.duration_since(UNIX_EPOCH)
.expect("SystemTime before UNIX EPOCH!")
.as_micros() as u64
}

pub fn duration(&self) -> Duration {
let expiry =
SystemTime::UNIX_EPOCH + Duration::from_micros(self.expiry.load(Ordering::SeqCst));
expiry
.duration_since(SystemTime::now())
.unwrap_or(Duration::ZERO)
}

fn get_duration_micros(when: SystemTime) -> u64 {
when.duration_since(UNIX_EPOCH)
.expect("SystemTime before UNIX EPOCH!")
.as_micros() as u64
pub fn expired_at(&self, when: SystemTime) -> bool {
let when = Self::since_epoch(when);
self.expiry.load(Ordering::SeqCst) <= when
}

#[allow(dead_code)]
pub fn update(&self, ttl: Duration) {
self.expiry
.store(Self::since_epoch(SystemTime::now() + ttl), Ordering::SeqCst);
}

pub fn update_if_expired(&self, ttl: u64, when: SystemTime) -> bool {
let ttl_micros = ttl * 1_000_000;
let when_micros = Self::since_epoch(when);
let expiry = self.expiry.load(Ordering::SeqCst);
if expiry <= when_micros {
let new_expiry = when_micros + ttl_micros;
return self
.expiry
.compare_exchange(expiry, new_expiry, Ordering::SeqCst, Ordering::SeqCst)
.is_ok();
}
false
}
}

impl Clone for AtomicExpiryTime {
fn clone(&self) -> Self {
Self {
expiry: AtomicU64::new(self.expiry.load(Ordering::SeqCst)),
}
}
}

impl Default for AtomicExpiringValue {
fn default() -> Self {
AtomicExpiringValue {
value: AtomicI64::new(0),
expiry: AtomicU64::new(0),
expiry: AtomicExpiryTime::new(UNIX_EPOCH),
}
}
}
Expand All @@ -77,7 +125,7 @@ impl Clone for AtomicExpiringValue {
fn clone(&self) -> Self {
AtomicExpiringValue {
value: AtomicI64::new(self.value.load(Ordering::SeqCst)),
expiry: AtomicU64::new(self.expiry.load(Ordering::SeqCst)),
expiry: self.expiry.clone(),
}
}
}
Expand Down
Loading

0 comments on commit c672633

Please sign in to comment.