diff --git a/Cargo.lock b/Cargo.lock index 66c2c852..fb7b7109 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -733,6 +733,7 @@ dependencies = [ "ciborium", "clap", "criterion-plot", + "futures", "is-terminal", "itertools 0.10.5", "num-traits", @@ -745,6 +746,7 @@ dependencies = [ "serde_derive", "serde_json", "tinytemplate", + "tokio", "walkdir", ] diff --git a/limitador-server/src/main.rs b/limitador-server/src/main.rs index 6a0e5220..13c965b1 100644 --- a/limitador-server/src/main.rs +++ b/limitador-server/src/main.rs @@ -166,7 +166,7 @@ impl Limiter { cfg.name, cfg.cache_size.or_else(guess_cache_size).unwrap(), cfg.local, - cfg.broadcast, + Some(cfg.broadcast), ); let rate_limiter_builder = RateLimiterBuilder::with_storage(Storage::with_counter_storage(Box::new(storage))); diff --git a/limitador/Cargo.toml b/limitador/Cargo.toml index 8f0a681b..366db867 100644 --- a/limitador/Cargo.toml +++ b/limitador/Cargo.toml @@ -51,7 +51,7 @@ tokio = { version = "1", optional = true, features = [ [dev-dependencies] serial_test = "3.0" -criterion = { version = "0.5.1", features = ["html_reports"] } +criterion = { version = "0.5.1", features = ["html_reports", "async_tokio"] } redis-test = { version = "0.4.0", features = ["aio"] } paste = "1" rand = "0.8" diff --git a/limitador/benches/bench.rs b/limitador/benches/bench.rs index 5905a7af..111bad1f 100644 --- a/limitador/benches/bench.rs +++ b/limitador/benches/bench.rs @@ -1,15 +1,21 @@ +use std::collections::HashMap; +use std::fmt::{Display, Formatter}; +use std::future::Future; +use std::time::Instant; + use criterion::{black_box, criterion_group, criterion_main, Bencher, BenchmarkId, Criterion}; use rand::seq::SliceRandom; +use rand::SeedableRng; use limitador::limit::Limit; #[cfg(feature = "disk_storage")] use limitador::storage::disk::{DiskStorage, OptimizeFor}; +#[cfg(feature = "distributed_storage")] +use limitador::storage::distributed::CrInMemoryStorage; use limitador::storage::in_memory::InMemoryStorage; -use limitador::storage::CounterStorage; -use limitador::RateLimiter; -use rand::SeedableRng; -use std::collections::HashMap; -use std::fmt::{Display, Formatter}; +use limitador::storage::redis::CachedRedisStorageBuilder; +use limitador::storage::{AsyncCounterStorage, CounterStorage}; +use limitador::{AsyncRateLimiter, RateLimiter}; const SEED: u64 = 42; @@ -18,9 +24,32 @@ criterion_group!(benches, bench_in_mem); #[cfg(all(feature = "disk_storage", not(feature = "redis_storage")))] criterion_group!(benches, bench_in_mem, bench_disk); #[cfg(all(not(feature = "disk_storage"), feature = "redis_storage"))] -criterion_group!(benches, bench_in_mem, bench_redis); -#[cfg(all(feature = "disk_storage", feature = "redis_storage"))] -criterion_group!(benches, bench_in_mem, bench_disk, bench_redis); +criterion_group!(benches, bench_in_mem, bench_redis, bench_cached_redis); +#[cfg(all( + feature = "disk_storage", + feature = "redis_storage", + not(feature = "distributed_storage") +))] +criterion_group!( + benches, + bench_in_mem, + bench_disk, + bench_redis, + bench_cached_redis +); +#[cfg(all( + feature = "disk_storage", + feature = "redis_storage", + feature = "distributed_storage" +))] +criterion_group!( + benches, + bench_in_mem, + bench_disk, + bench_redis, + bench_cached_redis, + bench_distributed, +); criterion_main!(benches); @@ -70,7 +99,7 @@ impl Display for TestScenario { } fn bench_in_mem(c: &mut Criterion) { - let mut group = c.benchmark_group("In memory"); + let mut group = c.benchmark_group("Memory"); for scenario in TEST_SCENARIOS { group.bench_with_input( BenchmarkId::new("is_rate_limited", scenario), @@ -100,6 +129,63 @@ fn bench_in_mem(c: &mut Criterion) { group.finish(); } +#[cfg(feature = "distributed_storage")] +fn bench_distributed(c: &mut Criterion) { + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + + let mut group = c.benchmark_group("Distributed"); + for scenario in TEST_SCENARIOS { + group.bench_with_input( + BenchmarkId::new("is_rate_limited", scenario), + scenario, + |b: &mut Bencher, test_scenario: &&TestScenario| { + runtime.block_on(async move { + let storage = Box::new(CrInMemoryStorage::new( + "test_node".to_owned(), + 10_000, + "127.0.0.1:0".to_owned(), + None, + )); + bench_is_rate_limited(b, test_scenario, storage); + }) + }, + ); + group.bench_with_input( + BenchmarkId::new("update_counters", scenario), + scenario, + |b: &mut Bencher, test_scenario: &&TestScenario| { + runtime.block_on(async move { + let storage = Box::new(CrInMemoryStorage::new( + "test_node".to_owned(), + 10_000, + "127.0.0.1:0".to_owned(), + None, + )); + bench_update_counters(b, test_scenario, storage); + }) + }, + ); + group.bench_with_input( + BenchmarkId::new("check_rate_limited_and_update", scenario), + scenario, + |b: &mut Bencher, test_scenario: &&TestScenario| { + runtime.block_on(async move { + let storage = Box::new(CrInMemoryStorage::new( + "test_node".to_owned(), + 10_000, + "127.0.0.1:0".to_owned(), + None, + )); + bench_check_rate_limited_and_update(b, test_scenario, storage); + }) + }, + ); + } + group.finish(); +} #[cfg(feature = "disk_storage")] fn bench_disk(c: &mut Criterion) { let mut group = c.benchmark_group("Disk"); @@ -138,6 +224,55 @@ fn bench_disk(c: &mut Criterion) { group.finish(); } +#[cfg(feature = "redis_storage")] +fn bench_cached_redis(c: &mut Criterion) { + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + + async fn create_storage() -> Box { + let storage_builder = CachedRedisStorageBuilder::new("redis://127.0.0.1:6379"); + let storage = storage_builder + .build() + .await + .expect("We need a Redis running locally"); + storage.clear().await.unwrap(); + Box::new(storage) + } + + let mut group = c.benchmark_group("CachedRedis"); + for scenario in TEST_SCENARIOS { + group.bench_with_input( + BenchmarkId::new("is_rate_limited", scenario), + scenario, + |b: &mut Bencher, test_scenario: &&TestScenario| { + async_bench_is_rate_limited(&runtime, b, test_scenario, create_storage); + }, + ); + group.bench_with_input( + BenchmarkId::new("update_counters", scenario), + scenario, + |b: &mut Bencher, test_scenario: &&TestScenario| { + async_bench_update_counters(&runtime, b, test_scenario, create_storage); + }, + ); + group.bench_with_input( + BenchmarkId::new("check_rate_limited_and_update", scenario), + scenario, + |b: &mut Bencher, test_scenario: &&TestScenario| { + async_bench_check_rate_limited_and_update( + &runtime, + b, + test_scenario, + create_storage, + ); + }, + ); + } + group.finish(); +} + #[cfg(feature = "redis_storage")] fn bench_redis(c: &mut Criterion) { let mut group = c.benchmark_group("Redis"); @@ -195,6 +330,37 @@ fn bench_is_rate_limited( }) } +fn async_bench_is_rate_limited( + runtime: &tokio::runtime::Runtime, + b: &mut Bencher, + test_scenario: &TestScenario, + storage: fn() -> F, +) where + F: Future>, +{ + b.to_async(runtime).iter_custom(|iters| async move { + let storage = storage().await; + let (rate_limiter, call_params) = generate_async_test_data(test_scenario, storage); + let rng = &mut rand::rngs::StdRng::seed_from_u64(SEED); + + let start = Instant::now(); + for _i in 0..iters { + black_box({ + let params = call_params.choose(rng).unwrap(); + rate_limiter + .is_rate_limited( + ¶ms.namespace.to_owned().into(), + ¶ms.values, + params.delta, + ) + .await + .unwrap() + }); + } + start.elapsed() + }) +} + fn bench_update_counters( b: &mut Bencher, test_scenario: &TestScenario, @@ -214,7 +380,37 @@ fn bench_update_counters( params.delta, ) .unwrap(); - black_box(()) + }) +} + +fn async_bench_update_counters( + runtime: &tokio::runtime::Runtime, + b: &mut Bencher, + test_scenario: &TestScenario, + storage: fn() -> F, +) where + F: Future>, +{ + b.to_async(runtime).iter_custom(|iters| async move { + let storage = storage().await; + let (rate_limiter, call_params) = generate_async_test_data(test_scenario, storage); + let rng = &mut rand::rngs::StdRng::seed_from_u64(SEED); + + let start = Instant::now(); + for _i in 0..iters { + black_box({ + let params = call_params.choose(rng).unwrap(); + rate_limiter + .update_counters( + ¶ms.namespace.to_owned().into(), + ¶ms.values, + params.delta, + ) + .await + }) + .unwrap(); + } + start.elapsed() }) } @@ -243,6 +439,39 @@ fn bench_check_rate_limited_and_update( }) } +fn async_bench_check_rate_limited_and_update( + runtime: &tokio::runtime::Runtime, + b: &mut Bencher, + test_scenario: &TestScenario, + storage: fn() -> F, +) where + F: Future>, +{ + b.to_async(runtime).iter_custom(|iters| async move { + let storage = storage().await; + let (rate_limiter, call_params) = generate_async_test_data(test_scenario, storage); + let rng = &mut rand::rngs::StdRng::seed_from_u64(SEED); + + let start = Instant::now(); + for _i in 0..iters { + black_box({ + let params = call_params.choose(rng).unwrap(); + + rate_limiter + .check_rate_limited_and_update( + ¶ms.namespace.to_owned().into(), + ¶ms.values, + params.delta, + false, + ) + .await + .unwrap() + }); + } + start.elapsed() + }) +} + // Notice that this function creates all the limits with the same conditions and // variables. Also, all the conditions have the same format: "cond_x == 1". // That's to simplify things, those are not the aspects that should have the @@ -255,6 +484,39 @@ fn generate_test_data( scenario: &TestScenario, storage: Box, ) -> (RateLimiter, Vec) { + let rate_limiter = RateLimiter::new_with_storage(storage); + + let (test_limits, call_params) = generate_test_limits(scenario); + for limit in test_limits { + rate_limiter.add_limit(limit); + } + + (rate_limiter, call_params) +} + +// Notice that this function creates all the limits with the same conditions and +// variables. Also, all the conditions have the same format: "cond_x == 1". +// That's to simplify things, those are not the aspects that should have the +// greatest impact on performance. +// The limits generated are big enough to avoid being rate-limited during the +// benchmark. +// Note that with this test data each request only increases one counter, we can +// that as another variable in the future. +fn generate_async_test_data( + scenario: &TestScenario, + storage: Box, +) -> (AsyncRateLimiter, Vec) { + let rate_limiter = AsyncRateLimiter::new_with_storage(storage); + + let (test_limits, call_params) = generate_test_limits(scenario); + for limit in test_limits { + rate_limiter.add_limit(limit); + } + + (rate_limiter, call_params) +} + +fn generate_test_limits(scenario: &TestScenario) -> (Vec, Vec) { let mut test_values: HashMap = HashMap::new(); let mut conditions = vec![]; @@ -293,12 +555,5 @@ fn generate_test_data( delta: 1, }); } - - let rate_limiter = RateLimiter::new_with_storage(storage); - - for limit in test_limits { - rate_limiter.add_limit(limit); - } - - (rate_limiter, call_params) + (test_limits, call_params) } diff --git a/limitador/src/storage/distributed/mod.rs b/limitador/src/storage/distributed/mod.rs index 5732a322..ff9aa923 100644 --- a/limitador/src/storage/distributed/mod.rs +++ b/limitador/src/storage/distributed/mod.rs @@ -249,28 +249,34 @@ impl CounterStorage for CrInMemoryStorage { } impl CrInMemoryStorage { - pub fn new(identifier: String, cache_size: u64, local: String, broadcast: String) -> Self { + pub fn new( + identifier: String, + cache_size: u64, + local: String, + broadcast: Option, + ) -> Self { let (sender, mut rx) = mpsc::channel(1000); let local = local.to_socket_addrs().unwrap().next().unwrap(); - let remote = broadcast.clone(); - tokio::spawn(async move { - let sock = UdpSocket::bind(local).await.unwrap(); - sock.set_broadcast(true).unwrap(); - sock.connect(remote).await.unwrap(); - loop { - let message: CounterValueMessage = rx.recv().await.unwrap(); - let buf = postcard::to_stdvec(&message).unwrap(); - match sock.send(&buf).await { - Ok(len) => { - if len != buf.len() { - println!("Couldn't send complete message!"); + if let Some(remote) = broadcast.clone() { + tokio::spawn(async move { + let sock = UdpSocket::bind(local).await.unwrap(); + sock.set_broadcast(true).unwrap(); + sock.connect(remote).await.unwrap(); + loop { + let message: CounterValueMessage = rx.recv().await.unwrap(); + let buf = postcard::to_stdvec(&message).unwrap(); + match sock.send(&buf).await { + Ok(len) => { + if len != buf.len() { + println!("Couldn't send complete message!"); + } } - } - Err(err) => println!("Couldn't send update: {:?}", err), - }; - } - }); + Err(err) => println!("Couldn't send update: {:?}", err), + }; + } + }); + } let limits_for_namespace = Arc::new(RwLock::new(HashMap::< Namespace, @@ -282,44 +288,53 @@ impl CrInMemoryStorage { { let limits_for_namespace = limits_for_namespace.clone(); let qualified_counters = qualified_counters.clone(); - tokio::spawn(async move { - let sock = UdpSocket::bind(broadcast).await.unwrap(); - sock.set_broadcast(true).unwrap(); - let mut buf = [0; 1024]; - loop { - let (len, addr) = sock.recv_from(&mut buf).await.unwrap(); - if addr != local { - match postcard::from_bytes::(&buf[..len]) { - Ok(message) => { - let CounterValueMessage { - counter_key, - expiry, - values, - } = message; - let counter = >::into(counter_key); - if counter.is_qualified() { - if let Some(counter) = qualified_counters.get(&counter) { - counter.merge( + + if let Some(broadcast) = broadcast.clone() { + tokio::spawn(async move { + let sock = UdpSocket::bind(broadcast).await.unwrap(); + sock.set_broadcast(true).unwrap(); + let mut buf = [0; 1024]; + loop { + let (len, addr) = sock.recv_from(&mut buf).await.unwrap(); + if addr != local { + match postcard::from_bytes::(&buf[..len]) { + Ok(message) => { + let CounterValueMessage { + counter_key, + expiry, + values, + } = message; + let counter = >::into(counter_key); + if counter.is_qualified() { + if let Some(counter) = qualified_counters.get(&counter) { + counter.merge( + (UNIX_EPOCH + Duration::from_secs(expiry), values) + .into(), + ); + } + } else { + let counters = limits_for_namespace.read().unwrap(); + let limits = counters.get(counter.namespace()).unwrap(); + let value = limits.get(counter.limit()).unwrap(); + value.merge( (UNIX_EPOCH + Duration::from_secs(expiry), values) .into(), ); - } - } else { - let counters = limits_for_namespace.read().unwrap(); - let limits = counters.get(counter.namespace()).unwrap(); - let value = limits.get(counter.limit()).unwrap(); - value.merge( - (UNIX_EPOCH + Duration::from_secs(expiry), values).into(), - ); - }; - } - Err(err) => { - println!("Error from {} bytes: {:?} \n{:?}", len, err, &buf[..len]) + }; + } + Err(err) => { + println!( + "Error from {} bytes: {:?} \n{:?}", + len, + err, + &buf[..len] + ) + } } } } - } - }); + }); + } } Self { diff --git a/limitador/src/storage/in_memory.rs b/limitador/src/storage/in_memory.rs index 10ad1f6a..b26d44d5 100644 --- a/limitador/src/storage/in_memory.rs +++ b/limitador/src/storage/in_memory.rs @@ -100,7 +100,7 @@ impl CounterStorage for InMemoryStorage { delta: u64, load_counters: bool, ) -> Result { - let limits_by_namespace = self.limits_for_namespace.write().unwrap(); + let limits_by_namespace = self.limits_for_namespace.read().unwrap(); let mut first_limited = None; let mut counter_values_to_update: Vec<(&AtomicExpiringValue, u64)> = Vec::new(); let mut qualified_counter_values_to_updated: Vec<(Arc, u64)> = diff --git a/limitador/tests/integration_tests.rs b/limitador/tests/integration_tests.rs index 2b1e9afe..06c308c2 100644 --- a/limitador/tests/integration_tests.rs +++ b/limitador/tests/integration_tests.rs @@ -17,7 +17,7 @@ macro_rules! test_with_all_storage_impls { #[tokio::test] async fn [<$function _distributed_storage>]() { let rate_limiter = - RateLimiter::new_with_storage(Box::new(CrInMemoryStorage::new("test_node".to_owned(), 10_000, "127.0.0.1:19876".to_owned(), "127.0.0.255:19876".to_owned()))); + RateLimiter::new_with_storage(Box::new(CrInMemoryStorage::new("test_node".to_owned(), 10_000, "127.0.0.1:19876".to_owned(), Some("127.0.0.255:19876".to_owned())))); $function(&mut TestsLimiter::new_from_blocking_impl(rate_limiter)).await; }