From 3191caa9e3baf07251a7bb43f1b7c2458763302f Mon Sep 17 00:00:00 2001 From: Alex Snaps Date: Fri, 3 May 2024 08:45:10 -0400 Subject: [PATCH] Batcher tests --- limitador/src/storage/redis/counters_cache.rs | 130 ++++++++++++++++++ 1 file changed, 130 insertions(+) diff --git a/limitador/src/storage/redis/counters_cache.rs b/limitador/src/storage/redis/counters_cache.rs index caa9a06d..0b7ab962 100644 --- a/limitador/src/storage/redis/counters_cache.rs +++ b/limitador/src/storage/redis/counters_cache.rs @@ -486,6 +486,136 @@ mod tests { } } + mod batcher { + use crate::storage::redis::counters_cache::tests::test_counter; + use crate::storage::redis::counters_cache::{Batcher, CachedCounterValue}; + use std::sync::Arc; + use std::time::{Duration, SystemTime}; + + #[tokio::test] + async fn consume_waits_when_empty() { + let duration = Duration::from_millis(100); + let batcher = Batcher::new(duration); + let start = SystemTime::now(); + batcher + .consume(2, |items| { + assert!(items.is_empty()); + assert!(SystemTime::now().duration_since(start).unwrap() >= duration); + async {} + }) + .await; + } + + #[tokio::test] + async fn consume_waits_when_batch_not_filled() { + let duration = Duration::from_millis(100); + let batcher = Arc::new(Batcher::new(duration)); + let start = SystemTime::now(); + { + let batcher = Arc::clone(&batcher); + tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(40)).await; + let counter = test_counter(6, None); + let arc = Arc::new(CachedCounterValue::from_authority( + &counter, + 0, + Duration::from_secs(1), + )); + batcher.add(counter, arc); + }); + } + batcher + .consume(2, |items| { + assert_eq!(items.len(), 1); + assert!( + SystemTime::now().duration_since(start).unwrap() + >= Duration::from_millis(100) + ); + async {} + }) + .await; + } + + #[tokio::test] + async fn consume_waits_until_batch_is_filled() { + let duration = Duration::from_millis(100); + let batcher = Arc::new(Batcher::new(duration)); + let start = SystemTime::now(); + { + let batcher = Arc::clone(&batcher); + tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(40)).await; + let counter = test_counter(6, None); + let arc = Arc::new(CachedCounterValue::from_authority( + &counter, + 0, + Duration::from_secs(1), + )); + batcher.add(counter, arc); + }); + } + batcher + .consume(1, |items| { + assert_eq!(items.len(), 1); + let wait_period = SystemTime::now().duration_since(start).unwrap(); + assert!(wait_period >= Duration::from_millis(40)); + assert!(wait_period < Duration::from_millis(50)); + async {} + }) + .await; + } + + #[tokio::test] + async fn consume_immediately_when_batch_is_filled() { + let duration = Duration::from_millis(100); + let batcher = Arc::new(Batcher::new(duration)); + let start = SystemTime::now(); + { + let counter = test_counter(6, None); + let arc = Arc::new(CachedCounterValue::from_authority( + &counter, + 0, + Duration::from_secs(1), + )); + batcher.add(counter, arc); + } + batcher + .consume(1, |items| { + assert_eq!(items.len(), 1); + assert!( + SystemTime::now().duration_since(start).unwrap() < Duration::from_millis(5) + ); + async {} + }) + .await; + } + + #[tokio::test] + async fn consume_triggers_on_fast_flush() { + let duration = Duration::from_millis(100); + let batcher = Arc::new(Batcher::new(duration)); + let start = SystemTime::now(); + { + let batcher = Arc::clone(&batcher); + tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(40)).await; + let counter = test_counter(6, None); + let arc = Arc::new(CachedCounterValue::load_from_authority_asap(&counter, 0)); + batcher.add(counter, arc); + }); + } + batcher + .consume(2, |items| { + assert_eq!(items.len(), 1); + let wait_period = SystemTime::now().duration_since(start).unwrap(); + assert!(wait_period >= Duration::from_millis(40)); + assert!(wait_period < Duration::from_millis(50)); + async {} + }) + .await; + } + } + #[test] fn get_existing_counter() { let counter = test_counter(10, None);