Skip to content

Commit

Permalink
All redis integer types as i64
Browse files Browse the repository at this point in the history
As per the docs:
>Integers
>
>This type is a CRLF-terminated string that represents a signed, base-10, 64-bit integer.
  • Loading branch information
alexsnaps committed May 15, 2024
1 parent 41b5103 commit c92eb9d
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 18 deletions.
7 changes: 6 additions & 1 deletion limitador/src/storage/disk/rocksdb_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,12 @@ impl CounterStorage for RocksDbStorage {

if load_counters {
counter.set_expires_in(ttl);
counter.set_remaining(counter.max_value().checked_sub(val + delta).unwrap_or_default());
counter.set_remaining(
counter
.max_value()
.checked_sub(val + delta)
.unwrap_or_default(),
);
}

if counter.max_value() < val + delta {
Expand Down
10 changes: 5 additions & 5 deletions limitador/src/storage/redis/counters_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,10 +248,10 @@ impl CountersCache {
counter: Counter,
redis_val: u64,
remote_deltas: u64,
redis_expiry: u64,
redis_expiry: i64,
) -> Arc<CachedCounterValue> {
if redis_expiry > 0 {
let expiry_ts = SystemTime::UNIX_EPOCH + Duration::from_millis(redis_expiry);
let expiry_ts = SystemTime::UNIX_EPOCH + Duration::from_millis(redis_expiry as u64);
if expiry_ts > SystemTime::now() {
let mut from_cache = true;
let cached = self.cache.get_with(counter.clone(), || {
Expand Down Expand Up @@ -539,7 +539,7 @@ mod tests {
.add(Duration::from_secs(1))
.duration_since(UNIX_EPOCH)
.unwrap()
.as_micros() as u64,
.as_micros() as i64,
);

assert!(cache.get(&counter).is_some());
Expand Down Expand Up @@ -569,7 +569,7 @@ mod tests {
.add(Duration::from_secs(1))
.duration_since(UNIX_EPOCH)
.unwrap()
.as_micros() as u64,
.as_micros() as i64,
);

assert_eq!(
Expand All @@ -593,7 +593,7 @@ mod tests {
.add(Duration::from_secs(1))
.duration_since(UNIX_EPOCH)
.unwrap()
.as_micros() as u64,
.as_micros() as i64,
);
cache.increase_by(&counter, increase_by).await;

Expand Down
12 changes: 6 additions & 6 deletions limitador/src/storage/redis/redis_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ impl AsyncCounterStorage for AsyncRedisStorage {
let mut con = self.conn_manager.clone();

match con
.get::<String, Option<u64>>(key_for_counter(counter))
.get::<String, Option<i64>>(key_for_counter(counter))
.instrument(debug_span!("datastore"))
.await?
{
Some(val) => Ok(val + delta <= counter.max_value()),
Some(val) => Ok(u64::try_from(val).unwrap_or(0) + delta <= counter.max_value()),
None => Ok(counter.max_value().checked_sub(delta).is_some()),
}
}
Expand Down Expand Up @@ -89,7 +89,7 @@ impl AsyncCounterStorage for AsyncRedisStorage {
return Ok(res);
}
} else {
let counter_vals: Vec<Option<u64>> = {
let counter_vals: Vec<Option<i64>> = {
redis::cmd("MGET")
.arg(counter_keys.clone())
.query_async(&mut con)
Expand All @@ -101,7 +101,7 @@ impl AsyncCounterStorage for AsyncRedisStorage {
// remaining = max - (curr_val + delta)
let remaining = counter
.max_value()
.checked_sub(counter_vals[i].unwrap_or(0) + delta);
.checked_sub(u64::try_from(counter_vals[i].unwrap_or(0)).unwrap_or(0) + delta);
if remaining.is_none() {
return Ok(Authorization::Limited(
counter.limit().name().map(|n| n.to_owned()),
Expand Down Expand Up @@ -150,12 +150,12 @@ impl AsyncCounterStorage for AsyncRedisStorage {
// This does not cause any bugs, but consumes memory
// unnecessarily.
let option = {
con.get::<String, Option<u64>>(counter_key.clone())
con.get::<String, Option<i64>>(counter_key.clone())
.instrument(debug_span!("datastore"))
.await?
};
if let Some(val) = option {
counter.set_remaining(limit.max_value() - val);
counter.set_remaining(limit.max_value() - u64::try_from(val).unwrap_or(0));
let ttl = {
con.ttl(&counter_key)
.instrument(debug_span!("datastore"))
Expand Down
18 changes: 12 additions & 6 deletions limitador/src/storage/redis/redis_cached.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,11 @@ impl AsyncCounterStorage for CachedRedisStorage {
first_limited = Some(a);
}
if load_counters {
counter.set_remaining(val.remaining(counter).checked_sub(delta).unwrap_or_default());
counter.set_remaining(
val.remaining(counter)
.checked_sub(delta)
.unwrap_or_default(),
);
counter.set_expires_in(val.to_next_window());
}
}
Expand Down Expand Up @@ -280,14 +284,14 @@ impl CachedRedisStorageBuilder {
async fn update_counters<C: ConnectionLike>(
redis_conn: &mut C,
counters_and_deltas: HashMap<Counter, Arc<CachedCounterValue>>,
) -> Result<Vec<(Counter, u64, u64, u64)>, StorageErr> {
) -> Result<Vec<(Counter, u64, u64, i64)>, StorageErr> {
let redis_script = redis::Script::new(BATCH_UPDATE_COUNTERS);
let mut script_invocation = redis_script.prepare_invoke();

let res = if counters_and_deltas.is_empty() {
Default::default()
} else {
let mut res: Vec<(Counter, u64, u64, u64)> = Vec::with_capacity(counters_and_deltas.len());
let mut res: Vec<(Counter, u64, u64, i64)> = Vec::with_capacity(counters_and_deltas.len());

for (counter, value) in counters_and_deltas {
let (delta, last_value_from_redis) = value
Expand All @@ -305,7 +309,7 @@ async fn update_counters<C: ConnectionLike>(

let span = debug_span!("datastore");
// The redis crate is not working with tables, thus the response will be a Vec of counter values
let script_res: Vec<u64> = script_invocation
let script_res: Vec<i64> = script_invocation
.invoke_async(redis_conn)
.instrument(span)
.await?;
Expand All @@ -316,8 +320,10 @@ async fn update_counters<C: ConnectionLike>(

for (i, j) in counters_range.zip(script_res_range) {
let (_, val, delta, expires_at) = &mut res[i];
*val = script_res[j];
*delta = script_res[j] - *delta;
*val = u64::try_from(script_res[j]).unwrap_or(0);
*delta = u64::try_from(script_res[j])
.unwrap_or(0)
.saturating_sub(*delta);
*expires_at = script_res[j + 1];
}
res
Expand Down

0 comments on commit c92eb9d

Please sign in to comment.