Skip to content

Commit

Permalink
Include the distributed store in the benchmarks.
Browse files Browse the repository at this point in the history
Signed-off-by: Hiram Chirino <hiram@hiramchirino.com>
  • Loading branch information
chirino committed May 18, 2024
1 parent 5989788 commit 0c3c41c
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 60 deletions.
2 changes: 1 addition & 1 deletion limitador-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ impl Limiter {
cfg.name,
cfg.cache_size.or_else(guess_cache_size).unwrap(),
cfg.local,
cfg.broadcast,
Some(cfg.broadcast),
);
let rate_limiter_builder =
RateLimiterBuilder::with_storage(Storage::with_counter_storage(Box::new(storage)));
Expand Down
81 changes: 73 additions & 8 deletions limitador/benches/bench.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use std::future::Future;
use std::time::Instant;

use criterion::{black_box, criterion_group, criterion_main, Bencher, BenchmarkId, Criterion};

Check failure on line 6 in limitador/benches/bench.rs

View workflow job for this annotation

GitHub Actions / Bench

unused import: `criterion_group`
use rand::seq::SliceRandom;
use rand::SeedableRng;

use limitador::limit::Limit;
#[cfg(feature = "disk_storage")]
use limitador::storage::disk::{DiskStorage, OptimizeFor};
#[cfg(feature = "distributed_storage")]
use limitador::storage::distributed::CrInMemoryStorage;
use limitador::storage::in_memory::InMemoryStorage;
use limitador::storage::redis::CachedRedisStorageBuilder;
use limitador::storage::{AsyncCounterStorage, CounterStorage};
use limitador::{AsyncRateLimiter, RateLimiter};
use rand::SeedableRng;
use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use std::future::Future;
use std::time::Instant;

const SEED: u64 = 42;

Expand All @@ -22,13 +25,18 @@ criterion_group!(benches, bench_in_mem);
criterion_group!(benches, bench_in_mem, bench_disk);
#[cfg(all(not(feature = "disk_storage"), feature = "redis_storage"))]
criterion_group!(benches, bench_in_mem, bench_redis, bench_cached_redis);
#[cfg(all(feature = "disk_storage", feature = "redis_storage"))]
#[cfg(all(
feature = "disk_storage",
feature = "redis_storage",
feature = "distributed_storage"
))]
criterion_group!(
benches,
bench_in_mem,
bench_disk,
bench_redis,
bench_cached_redis
bench_cached_redis,
bench_distributed,
);

criterion_main!(benches);

Check failure on line 42 in limitador/benches/bench.rs

View workflow job for this annotation

GitHub Actions / Bench

cannot find function `benches` in this scope
Expand Down Expand Up @@ -79,7 +87,7 @@ impl Display for TestScenario {
}

fn bench_in_mem(c: &mut Criterion) {
let mut group = c.benchmark_group("In memory");
let mut group = c.benchmark_group("Memory");
for scenario in TEST_SCENARIOS {
group.bench_with_input(
BenchmarkId::new("is_rate_limited", scenario),
Expand Down Expand Up @@ -109,6 +117,63 @@ fn bench_in_mem(c: &mut Criterion) {
group.finish();
}

#[cfg(feature = "distributed_storage")]
fn bench_distributed(c: &mut Criterion) {
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();

let mut group = c.benchmark_group("Distributed");
for scenario in TEST_SCENARIOS {
group.bench_with_input(
BenchmarkId::new("is_rate_limited", scenario),
scenario,
|b: &mut Bencher, test_scenario: &&TestScenario| {
runtime.block_on(async move {
let storage = Box::new(CrInMemoryStorage::new(
"test_node".to_owned(),
10_000,
"127.0.0.1:0".to_owned(),
None,
));
bench_is_rate_limited(b, test_scenario, storage);
})
},
);
group.bench_with_input(
BenchmarkId::new("update_counters", scenario),
scenario,
|b: &mut Bencher, test_scenario: &&TestScenario| {
runtime.block_on(async move {
let storage = Box::new(CrInMemoryStorage::new(
"test_node".to_owned(),
10_000,
"127.0.0.1:0".to_owned(),
None,
));
bench_update_counters(b, test_scenario, storage);
})
},
);
group.bench_with_input(
BenchmarkId::new("check_rate_limited_and_update", scenario),
scenario,
|b: &mut Bencher, test_scenario: &&TestScenario| {
runtime.block_on(async move {
let storage = Box::new(CrInMemoryStorage::new(
"test_node".to_owned(),
10_000,
"127.0.0.1:0".to_owned(),
None,
));
bench_check_rate_limited_and_update(b, test_scenario, storage);
})
},
);
}
group.finish();
}
#[cfg(feature = "disk_storage")]
fn bench_disk(c: &mut Criterion) {
let mut group = c.benchmark_group("Disk");
Expand Down
115 changes: 65 additions & 50 deletions limitador/src/storage/distributed/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,28 +249,34 @@ impl CounterStorage for CrInMemoryStorage {
}

impl CrInMemoryStorage {
pub fn new(identifier: String, cache_size: u64, local: String, broadcast: String) -> Self {
pub fn new(
identifier: String,
cache_size: u64,
local: String,
broadcast: Option<String>,
) -> Self {
let (sender, mut rx) = mpsc::channel(1000);

let local = local.to_socket_addrs().unwrap().next().unwrap();
let remote = broadcast.clone();
tokio::spawn(async move {
let sock = UdpSocket::bind(local).await.unwrap();
sock.set_broadcast(true).unwrap();
sock.connect(remote).await.unwrap();
loop {
let message: CounterValueMessage = rx.recv().await.unwrap();
let buf = postcard::to_stdvec(&message).unwrap();
match sock.send(&buf).await {
Ok(len) => {
if len != buf.len() {
println!("Couldn't send complete message!");
if let Some(remote) = broadcast.clone() {
tokio::spawn(async move {
let sock = UdpSocket::bind(local).await.unwrap();
sock.set_broadcast(true).unwrap();
sock.connect(remote).await.unwrap();
loop {
let message: CounterValueMessage = rx.recv().await.unwrap();
let buf = postcard::to_stdvec(&message).unwrap();
match sock.send(&buf).await {
Ok(len) => {
if len != buf.len() {
println!("Couldn't send complete message!");
}
}
}
Err(err) => println!("Couldn't send update: {:?}", err),
};
}
});
Err(err) => println!("Couldn't send update: {:?}", err),
};
}
});
}

let limits_for_namespace = Arc::new(RwLock::new(HashMap::<
Namespace,
Expand All @@ -282,44 +288,53 @@ impl CrInMemoryStorage {
{
let limits_for_namespace = limits_for_namespace.clone();
let qualified_counters = qualified_counters.clone();
tokio::spawn(async move {
let sock = UdpSocket::bind(broadcast).await.unwrap();
sock.set_broadcast(true).unwrap();
let mut buf = [0; 1024];
loop {
let (len, addr) = sock.recv_from(&mut buf).await.unwrap();
if addr != local {
match postcard::from_bytes::<CounterValueMessage>(&buf[..len]) {
Ok(message) => {
let CounterValueMessage {
counter_key,
expiry,
values,
} = message;
let counter = <CounterKey as Into<Counter>>::into(counter_key);
if counter.is_qualified() {
if let Some(counter) = qualified_counters.get(&counter) {
counter.merge(

if let Some(broadcast) = broadcast.clone() {
tokio::spawn(async move {
let sock = UdpSocket::bind(broadcast).await.unwrap();
sock.set_broadcast(true).unwrap();
let mut buf = [0; 1024];
loop {
let (len, addr) = sock.recv_from(&mut buf).await.unwrap();
if addr != local {
match postcard::from_bytes::<CounterValueMessage>(&buf[..len]) {
Ok(message) => {
let CounterValueMessage {
counter_key,
expiry,
values,
} = message;
let counter = <CounterKey as Into<Counter>>::into(counter_key);
if counter.is_qualified() {
if let Some(counter) = qualified_counters.get(&counter) {
counter.merge(
(UNIX_EPOCH + Duration::from_secs(expiry), values)
.into(),
);
}
} else {
let counters = limits_for_namespace.read().unwrap();
let limits = counters.get(counter.namespace()).unwrap();
let value = limits.get(counter.limit()).unwrap();
value.merge(
(UNIX_EPOCH + Duration::from_secs(expiry), values)
.into(),
);
}
} else {
let counters = limits_for_namespace.read().unwrap();
let limits = counters.get(counter.namespace()).unwrap();
let value = limits.get(counter.limit()).unwrap();
value.merge(
(UNIX_EPOCH + Duration::from_secs(expiry), values).into(),
);
};
}
Err(err) => {
println!("Error from {} bytes: {:?} \n{:?}", len, err, &buf[..len])
};
}
Err(err) => {
println!(
"Error from {} bytes: {:?} \n{:?}",
len,
err,
&buf[..len]
)
}
}
}
}
}
});
});
}
}

Self {
Expand Down
2 changes: 1 addition & 1 deletion limitador/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ macro_rules! test_with_all_storage_impls {
#[tokio::test]
async fn [<$function _distributed_storage>]() {
let rate_limiter =
RateLimiter::new_with_storage(Box::new(CrInMemoryStorage::new("test_node".to_owned(), 10_000, "127.0.0.1:19876".to_owned(), "127.0.0.255:19876".to_owned())));
RateLimiter::new_with_storage(Box::new(CrInMemoryStorage::new("test_node".to_owned(), 10_000, "127.0.0.1:19876".to_owned(), Some("127.0.0.255:19876".to_owned()))));
$function(&mut TestsLimiter::new_from_blocking_impl(rate_limiter)).await;
}

Expand Down

0 comments on commit 0c3c41c

Please sign in to comment.