Skip to content

Commit

Permalink
Merge pull request #382 from Kuadrant/pipeline
Browse files Browse the repository at this point in the history
Update Redis crate and use pipeline to update counters
  • Loading branch information
alexsnaps authored Oct 9, 2024
2 parents 7fe2cce + 9ec7b63 commit 69e43a9
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 41 deletions.
36 changes: 28 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions limitador/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@ metrics = "0.22.3"

# Optional dependencies
rocksdb = { version = "0.22", optional = true, features = ["multi-threaded-cf"] }
redis = { version = "0.25", optional = true, features = [
redis = { version = "0.27", optional = true, features = [
"connection-manager",
"tokio-comp",
"tls-native-tls",
"tokio-native-tls-comp",
"script",
] }
r2d2 = { version = "0.8", optional = true }
tokio = { version = "1", optional = true, features = [
Expand All @@ -62,8 +63,8 @@ time = "0.3.36"
[dev-dependencies]
serial_test = "3.0"
criterion = { version = "0.5.1", features = ["html_reports", "async_tokio"] }
redis-test = { version = "0.4.0", features = ["aio"] }
redis = { version = "0.25", features = [
redis-test = { version = "0.6.0", features = ["aio"] }
redis = { version = "0.27", features = [
"connection-manager",
"tokio-comp",
"tls-native-tls",
Expand Down
67 changes: 49 additions & 18 deletions limitador/src/storage/redis/redis_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::storage::redis::is_limited;
use crate::storage::redis::scripts::{SCRIPT_UPDATE_COUNTER, VALUES_AND_TTLS};
use crate::storage::{AsyncCounterStorage, Authorization, StorageErr};
use async_trait::async_trait;
use redis::{AsyncCommands, RedisError};
use redis::{AsyncCommands, ErrorKind, RedisError};
use std::collections::HashSet;
use std::ops::Deref;
use std::str::FromStr;
Expand Down Expand Up @@ -56,7 +56,7 @@ impl AsyncCounterStorage for AsyncRedisStorage {
.key(key_for_counters_of_limit(counter.limit()))
.arg(counter.window().as_secs())
.arg(delta)
.invoke_async::<_, ()>(&mut con)
.invoke_async::<()>(&mut con)
.instrument(info_span!("datastore"))
.await?;

Expand Down Expand Up @@ -112,17 +112,35 @@ impl AsyncCounterStorage for AsyncRedisStorage {
}
}

// TODO: this can be optimized by using pipelines with multiple updates
for (counter_idx, key) in counter_keys.into_iter().enumerate() {
let script = redis::Script::new(SCRIPT_UPDATE_COUNTER);
let mut pipeline = redis::pipe();
let mut pipeline = &mut pipeline;
for (counter_idx, key) in counter_keys.iter().enumerate() {
let counter = &counters[counter_idx];
redis::Script::new(SCRIPT_UPDATE_COUNTER)
.key(key)
.key(key_for_counters_of_limit(counter.limit()))
.arg(counter.window().as_secs())
.arg(delta)
.invoke_async::<_, _>(&mut con)
.instrument(info_span!("datastore"))
.await?
pipeline = pipeline
.invoke_script(
script
.key(key)
.key(key_for_counters_of_limit(counter.limit()))
.arg(counter.window().as_secs())
.arg(delta),
)
.ignore()
}
if let Err(err) = pipeline
.query_async::<()>(&mut con)
.instrument(info_span!("datastore"))
.await
{
if err.kind() == ErrorKind::NoScriptError {
script.prepare_invoke().load_async(&mut con).await?;
pipeline
.query_async::<()>(&mut con)
.instrument(info_span!("datastore"))
.await?;
} else {
Err(err)?;
}
}

Ok(Authorization::Ok)
Expand Down Expand Up @@ -191,7 +209,7 @@ impl AsyncCounterStorage for AsyncRedisStorage {
async fn clear(&self) -> Result<(), StorageErr> {
let mut con = self.conn_manager.clone();
redis::cmd("FLUSHDB")
.query_async::<_, ()>(&mut con)
.query_async::<()>(&mut con)
.instrument(info_span!("datastore"))
.await?;
Ok(())
Expand All @@ -201,17 +219,23 @@ impl AsyncCounterStorage for AsyncRedisStorage {
impl AsyncRedisStorage {
pub async fn new(redis_url: &str) -> Result<Self, RedisError> {
let info = ConnectionInfo::from_str(redis_url)?;
Ok(Self {
conn_manager: ConnectionManager::new(
Self::new_with_conn_manager(
ConnectionManager::new(
redis::Client::open(info)
.expect("This couldn't fail in the past, yet now it did somehow!"),
)
.await?,
})
)
.await
}

pub fn new_with_conn_manager(conn_manager: ConnectionManager) -> Self {
Self { conn_manager }
pub async fn new_with_conn_manager(
conn_manager: ConnectionManager,
) -> Result<Self, RedisError> {
let store = Self { conn_manager };
store.load_script(SCRIPT_UPDATE_COUNTER).await?;
store.load_script(VALUES_AND_TTLS).await?;
Ok(store)
}

async fn delete_counters_associated_with_limit(&self, limit: &Limit) -> Result<(), StorageErr> {
Expand All @@ -233,6 +257,13 @@ impl AsyncRedisStorage {

Ok(())
}

pub(super) async fn load_script(&self, script: &str) -> Result<(), RedisError> {
let mut con = self.conn_manager.clone();
let script = redis::Script::new(script);
script.prepare_invoke().load_async(&mut con).await?;
Ok(())
}
}

#[cfg(test)]
Expand Down
24 changes: 13 additions & 11 deletions limitador/src/storage/redis/redis_cached.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::storage::redis::{
use crate::storage::{AsyncCounterStorage, Authorization, StorageErr};
use async_trait::async_trait;
use metrics::gauge;
use redis::aio::{ConnectionLike, ConnectionManager};
use redis::aio::{ConnectionLike, ConnectionManager, ConnectionManagerConfig};
use redis::{ConnectionInfo, RedisError};
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
Expand Down Expand Up @@ -170,15 +170,13 @@ impl CachedRedisStorage {
response_timeout: Duration,
) -> Result<Self, RedisError> {
let info = ConnectionInfo::from_str(redis_url)?;
let redis_conn_manager = ConnectionManager::new_with_backoff_and_timeouts(
let redis_conn_manager = ConnectionManager::new_with_config(
redis::Client::open(info)
.expect("This couldn't fail in the past, yet now it did somehow!"),
2,
100,
1,
response_timeout,
// TLS handshake might result in an additional 2 RTTs to Redis, adding some headroom as well
(response_timeout * 3) + Duration::from_millis(50),
ConnectionManagerConfig::default()
.set_connection_timeout((response_timeout * 3) + Duration::from_millis(50))
.set_response_timeout(response_timeout)
.set_number_of_retries(1),
)
.await?;

Expand All @@ -189,7 +187,7 @@ impl CachedRedisStorage {
let counters_cache = Arc::new(cached_counters);
let partitioned = Arc::new(AtomicBool::new(false));
let async_redis_storage =
AsyncRedisStorage::new_with_conn_manager(redis_conn_manager.clone());
AsyncRedisStorage::new_with_conn_manager(redis_conn_manager.clone()).await?;

{
let counters_cache_clone = counters_cache.clone();
Expand All @@ -208,6 +206,10 @@ impl CachedRedisStorage {
});
}

async_redis_storage
.load_script(BATCH_UPDATE_COUNTERS)
.await?;

Ok(Self {
cached_counters: counters_cache,
async_redis_storage,
Expand Down Expand Up @@ -456,7 +458,7 @@ mod tests {
counters_and_deltas.insert(counter.clone(), arc);

let one_sec_from_now = SystemTime::now().add(Duration::from_secs(1));
let mock_response = Value::Bulk(vec![
let mock_response = Value::Array(vec![
Value::Int(NEW_VALUE_FROM_REDIS as i64),
Value::Int(
one_sec_from_now
Expand Down Expand Up @@ -510,7 +512,7 @@ mod tests {
Default::default(),
);

let mock_response = Value::Bulk(vec![
let mock_response = Value::Array(vec![
Value::Int(8),
Value::Int(
SystemTime::now()
Expand Down
2 changes: 1 addition & 1 deletion limitador/src/storage/redis/redis_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl CounterStorage for RedisStorage {
#[tracing::instrument(skip_all)]
fn clear(&self) -> Result<(), StorageErr> {
let mut con = self.conn_pool.get()?;
redis::cmd("FLUSHDB").execute(&mut *con);
redis::cmd("FLUSHDB").exec(&mut *con)?;
Ok(())
}
}
Expand Down

0 comments on commit 69e43a9

Please sign in to comment.