diff --git a/Cargo.lock b/Cargo.lock index 5652beea..75385697 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -49,7 +49,7 @@ dependencies = [ "mime", "percent-encoding", "pin-project-lite", - "rand", + "rand 0.8.5", "sha1 0.10.5", "smallvec", "tokio", @@ -288,6 +288,15 @@ dependencies = [ "syn", ] +[[package]] +name = "atomic-polyfill" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3ff7eb3f316534d83a8a2c3d1674ace8a5a71198eba31e2e2b597833f699b28" +dependencies = [ + "critical-section", +] + [[package]] name = "atty" version = "0.2.14" @@ -362,6 +371,26 @@ version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4a4ddaa51a5bc52a6948f74c06d20aaaddb71924eab79b8c97a8c556e942d6a" +[[package]] +name = "bindgen" +version = "0.64.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4243e6031260db77ede97ad86c27e501d646a27ab57b59a574f725d98ab1fb4" +dependencies = [ + "bitflags", + "cexpr", + "clang-sys", + "lazy_static", + "lazycell", + "peeking_take_while", + "proc-macro2", + "quote", + "regex", + "rustc-hash", + "shlex", + "syn", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -404,6 +433,12 @@ version = "3.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0d261e256854913907f67ed06efbc3338dfe6179796deefc1ff763fc1aee5535" +[[package]] +name = "byteorder" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" + [[package]] name = "bytes" version = "1.4.0" @@ -419,6 +454,17 @@ dependencies = [ "bytes", ] +[[package]] +name = "bzip2-sys" +version = "0.1.11+1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "736a955f3fa7875102d57c82b8cac37ec45224a07fd32d58f9f7a186b6cd4cdc" +dependencies = [ + "cc", + "libc", + "pkg-config", +] + [[package]] name = "cast" version = "0.3.0" @@ -434,6 +480,15 @@ dependencies = [ "jobserver", ] +[[package]] +name = "cexpr" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" +dependencies = [ + "nom", +] + [[package]] name = "cfg-if" version = "1.0.0" @@ -467,6 +522,17 @@ dependencies = [ "half", ] +[[package]] +name = "clang-sys" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c688fc74432808e3eb684cae8830a86be1d66a2bd58e1f248ed0960a590baf6f" +dependencies = [ + "glob", + "libc", + "libloading", +] + [[package]] name = "clap" version = "3.2.23" @@ -491,6 +557,12 @@ dependencies = [ "os_str_bytes", ] +[[package]] +name = "cobs" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67ba02a97a2bd10f4b59b25c7973101c79642302776489e030cd13cdab09ed15" + [[package]] name = "combine" version = "4.6.6" @@ -592,6 +664,12 @@ dependencies = [ "itertools", ] +[[package]] +name = "critical-section" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6548a0ad5d2549e111e1f6a11a6c2e2d00ce6a3dafe22948d67c2b443f775e52" + [[package]] name = "crossbeam-channel" version = "0.5.7" @@ -806,6 +884,12 @@ dependencies = [ "libc", ] +[[package]] +name = "fuchsia-cprng" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" + [[package]] name = "futures" version = "0.3.27" @@ -916,6 +1000,12 @@ dependencies = [ "wasi", ] +[[package]] +name = "glob" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" + [[package]] name = "h2" version = "0.3.18" @@ -941,12 +1031,35 @@ version = "1.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7" +[[package]] +name = "hash32" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0c35f58762feb77d74ebe43bdbc3210f09be9fe6742234d573bacc26ed92b67" +dependencies = [ + "byteorder", +] + [[package]] name = "hashbrown" version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +[[package]] +name = "heapless" +version = "0.7.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db04bc24a18b9ea980628ecf00e6c0264f3c1426dac36c00cb49b6fbad8b0743" +dependencies = [ + "atomic-polyfill", + "hash32", + "rustc_version", + "serde", + "spin", + "stable_deref_trait", +] + [[package]] name = "heck" version = "0.4.1" @@ -1205,31 +1318,79 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" +[[package]] +name = "lazycell" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" + [[package]] name = "libc" version = "0.2.140" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "99227334921fae1a979cf0bfdfcc6b3e5ce376ef57e16fb6fb3ea2ed6095f80c" +[[package]] +name = "libloading" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b67380fd3b2fbe7527a606e18729d21c6f3951633d0500574c4dc22d2d638b9f" +dependencies = [ + "cfg-if", + "winapi", +] + +[[package]] +name = "librocksdb-sys" +version = "0.10.0+7.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fe4d5874f5ff2bc616e55e8c6086d478fcda13faf9495768a4aa1c22042d30b" +dependencies = [ + "bindgen", + "bzip2-sys", + "cc", + "glob", + "libc", + "libz-sys", + "lz4-sys", + "zstd-sys", +] + +[[package]] +name = "libz-sys" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56ee889ecc9568871456d42f603d6a0ce59ff328d291063a45cbdf0036baf6db" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "limitador" -version = "0.4.0-dev" +version = "0.5.0-dev" dependencies = [ "async-trait", + "base64 0.21.0", "cfg-if", "criterion", "futures", "infinispan", "lazy_static", "paste", + "postcard", "prometheus", "r2d2", - "rand", + "rand 0.8.5", "redis", "reqwest", + "rmp-serde", + "rocksdb", "serde", "serde_json", "serial_test", + "tempdir", "thiserror", "tokio", "ttl_cache", @@ -1237,7 +1398,7 @@ dependencies = [ [[package]] name = "limitador-server" -version = "1.1.0-dev" +version = "1.3.0-dev" dependencies = [ "actix-rt", "actix-web", @@ -1308,6 +1469,16 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "lz4-sys" +version = "1.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57d27b317e207b10f69f5e75494119e391a96f48861ae870d1da6edac98ca900" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "matchit" version = "0.7.0" @@ -1335,6 +1506,12 @@ version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "miniz_oxide" version = "0.6.2" @@ -1380,6 +1557,16 @@ dependencies = [ "tempfile", ] +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + [[package]] name = "notify" version = "5.1.0" @@ -1583,6 +1770,12 @@ version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f746c4065a8fa3fe23974dd82f15431cc8d40779821001404d10d2e79ca7d79" +[[package]] +name = "peeking_take_while" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" + [[package]] name = "percent-encoding" version = "2.2.0" @@ -1665,6 +1858,17 @@ dependencies = [ "plotters-backend", ] +[[package]] +name = "postcard" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfa512cd0d087cc9f99ad30a1bf64795b67871edbead083ffc3a4dfafa59aa00" +dependencies = [ + "cobs", + "heapless", + "serde", +] + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -1809,6 +2013,19 @@ dependencies = [ "scheduled-thread-pool", ] +[[package]] +name = "rand" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "552840b97013b1a26992c11eac34bdd778e464601a4c2054b5f0bff7c6761293" +dependencies = [ + "fuchsia-cprng", + "libc", + "rand_core 0.3.1", + "rdrand", + "winapi", +] + [[package]] name = "rand" version = "0.8.5" @@ -1817,7 +2034,7 @@ checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", "rand_chacha", - "rand_core", + "rand_core 0.6.4", ] [[package]] @@ -1827,9 +2044,24 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.6.4", +] + +[[package]] +name = "rand_core" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a6fdeb83b075e8266dcc8762c22776f6877a63111121f5f8c7411e5be7eed4b" +dependencies = [ + "rand_core 0.4.2", ] +[[package]] +name = "rand_core" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc" + [[package]] name = "rand_core" version = "0.6.4" @@ -1861,6 +2093,15 @@ dependencies = [ "num_cpus", ] +[[package]] +name = "rdrand" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2" +dependencies = [ + "rand_core 0.3.1", +] + [[package]] name = "redis" version = "0.21.7" @@ -1909,6 +2150,15 @@ version = "0.6.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "456c603be3e8d448b072f410900c09faf164fbce2d480456f50eea6e25f9c848" +[[package]] +name = "remove_dir_all" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7" +dependencies = [ + "winapi", +] + [[package]] name = "reqwest" version = "0.11.14" @@ -1946,6 +2196,44 @@ dependencies = [ "winreg", ] +[[package]] +name = "rmp" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44519172358fd6d58656c86ab8e7fbc9e1490c3e8f14d35ed78ca0dd07403c9f" +dependencies = [ + "byteorder", + "num-traits", + "paste", +] + +[[package]] +name = "rmp-serde" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c5b13be192e0220b8afb7222aa5813cb62cc269ebb5cac346ca6487681d2913e" +dependencies = [ + "byteorder", + "rmp", + "serde", +] + +[[package]] +name = "rocksdb" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "015439787fce1e75d55f279078d33ff14b4af5d93d995e8838ee4631301c8a99" +dependencies = [ + "libc", + "librocksdb-sys", +] + +[[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" + [[package]] name = "rustc_version" version = "0.4.0" @@ -2163,6 +2451,12 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012" +[[package]] +name = "shlex" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43b2853a4d09f215c24cc5489c992ce46052d359b5109343cbafbf26bc62f8a3" + [[package]] name = "signal-hook-registry" version = "1.4.1" @@ -2197,6 +2491,21 @@ dependencies = [ "winapi", ] +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] + +[[package]] +name = "stable_deref_trait" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" + [[package]] name = "strsim" version = "0.10.0" @@ -2239,6 +2548,16 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" +[[package]] +name = "tempdir" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15f2b5fb00ccdf689e0149d1b1b3c03fead81c2b37735d812fa8bddbbf41b6d8" +dependencies = [ + "rand 0.4.6", + "remove_dir_all", +] + [[package]] name = "tempfile" version = "3.4.0" @@ -2471,7 +2790,7 @@ dependencies = [ "indexmap", "pin-project", "pin-project-lite", - "rand", + "rand 0.8.5", "slab", "tokio", "tokio-util", diff --git a/Dockerfile b/Dockerfile index 5be793a1..d46338bf 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,7 +14,7 @@ RUN dnf -y --setopt=install_weak_deps=False --setopt=tsflags=nodocs install \ && dnf -y --setopt=install_weak_deps=False --setopt=tsflags=nodocs install epel-release \ && dnf config-manager --set-enabled powertools -RUN PKGS="gcc-c++ gcc-toolset-12-binutils-gold openssl3-devel protobuf-c protobuf-devel git" \ +RUN PKGS="gcc-c++ gcc-toolset-12-binutils-gold openssl3-devel protobuf-c protobuf-devel git clang kernel-headers" \ && dnf install --nodocs --assumeyes $PKGS \ && rpm --verify --nogroup --nouser $PKGS \ && yum -y clean all @@ -39,7 +39,7 @@ RUN source $HOME/.cargo/env \ FROM registry.access.redhat.com/ubi8/ubi-minimal:8.7 # shadow-utils is required for `useradd` -RUN PKGS="libgcc shadow-utils" \ +RUN PKGS="libgcc libstdc++ shadow-utils" \ && microdnf --assumeyes install --nodocs $PKGS \ && rpm --verify --nogroup --nouser $PKGS \ && microdnf -y clean all diff --git a/limitador-server/src/config.rs b/limitador-server/src/config.rs index db63fb4b..5ab21afe 100644 --- a/limitador-server/src/config.rs +++ b/limitador-server/src/config.rs @@ -19,6 +19,7 @@ // HTTP_API_PORT: port use crate::envoy_rls::server::RateLimitHeaders; +use limitador::storage; use log::LevelFilter; #[derive(Debug)] @@ -92,11 +93,18 @@ impl Default for Configuration { #[derive(PartialEq, Eq, Debug)] pub enum StorageConfiguration { InMemory, + Disk(DiskStorageConfiguration), Redis(RedisStorageConfiguration), #[cfg(feature = "infinispan")] Infinispan(InfinispanStorageConfiguration), } +#[derive(PartialEq, Eq, Debug)] +pub struct DiskStorageConfiguration { + pub path: String, + pub optimization: storage::disk::OptimizeFor, +} + #[derive(PartialEq, Eq, Debug)] pub struct RedisStorageConfiguration { pub url: String, diff --git a/limitador-server/src/main.rs b/limitador-server/src/main.rs index 14235bae..0eb0e801 100644 --- a/limitador-server/src/main.rs +++ b/limitador-server/src/main.rs @@ -7,7 +7,8 @@ extern crate clap; #[cfg(feature = "infinispan")] use crate::config::InfinispanStorageConfiguration; use crate::config::{ - Configuration, RedisStorageCacheConfiguration, RedisStorageConfiguration, StorageConfiguration, + Configuration, DiskStorageConfiguration, RedisStorageCacheConfiguration, + RedisStorageConfiguration, StorageConfiguration, }; use crate::envoy_rls::server::{run_envoy_rls_server, RateLimitHeaders}; use crate::http_api::server::run_http_server; @@ -15,6 +16,7 @@ use clap::{App, Arg, SubCommand}; use env_logger::Builder; use limitador::errors::LimitadorError; use limitador::limit::Limit; +use limitador::storage::disk::DiskStorage; #[cfg(feature = "infinispan")] use limitador::storage::infinispan::{Consistency, InfinispanStorageBuilder}; #[cfg(feature = "infinispan")] @@ -26,8 +28,10 @@ use limitador::storage::redis::{ DEFAULT_MAX_CACHED_COUNTERS, DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC, DEFAULT_TTL_RATIO_CACHED_COUNTERS, }; -use limitador::storage::{AsyncCounterStorage, AsyncStorage}; -use limitador::{AsyncRateLimiter, AsyncRateLimiterBuilder, RateLimiter, RateLimiterBuilder}; +use limitador::storage::{AsyncCounterStorage, AsyncStorage, Storage}; +use limitador::{ + storage, AsyncRateLimiter, AsyncRateLimiterBuilder, RateLimiter, RateLimiterBuilder, +}; use log::LevelFilter; use notify::event::{ModifyKind, RenameMode}; use notify::{Error, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; @@ -82,6 +86,7 @@ impl Limiter { Self::infinispan_limiter(cfg, config.limit_name_in_labels).await } StorageConfiguration::InMemory => Self::in_memory_limiter(config), + StorageConfiguration::Disk(cfg) => Self::disk_limiter(cfg, config.limit_name_in_labels), }; Ok(rate_limiter) @@ -199,6 +204,24 @@ impl Limiter { Self::Async(rate_limiter_builder.build()) } + fn disk_limiter(cfg: DiskStorageConfiguration, limit_name_in_labels: bool) -> Self { + let storage = match DiskStorage::open(cfg.path.as_str(), cfg.optimization) { + Ok(storage) => storage, + Err(err) => { + eprintln!("Failed to open DB at {}: {err}", cfg.path); + process::exit(1) + } + }; + let mut rate_limiter_builder = + RateLimiterBuilder::new().storage(Storage::with_counter_storage(Box::new(storage))); + + if limit_name_in_labels { + rate_limiter_builder = rate_limiter_builder.with_prometheus_limit_name_labels() + } + + Self::Blocking(rate_limiter_builder.build()) + } + fn in_memory_limiter(cfg: Configuration) -> Self { let mut rate_limiter_builder = RateLimiterBuilder::new(); @@ -402,6 +425,9 @@ fn create_config() -> (Configuration, String) { let default_http_port = env::var("HTTP_API_PORT").unwrap_or_else(|_| Configuration::DEFAULT_HTTP_PORT.to_string()); + let disk_path = env::var("DISK_PATH").unwrap_or_else(|_| "".to_string()); + let disk_optimize = env::var("DISK_OPTIMIZE").unwrap_or_else(|_| "throughput".to_string()); + let redis_url = env::var("REDIS_URL").unwrap_or_else(|_| "".to_string()); let redis_cached_ttl_default = env::var("REDIS_LOCAL_CACHE_MAX_TTL_CACHED_COUNTERS_MS") @@ -438,6 +464,13 @@ fn create_config() -> (Configuration, String) { redis_url_arg.default_value(&redis_url) }; + let disk_path_arg = Arg::with_name("PATH").help("Path to counter DB").index(1); + let disk_path_arg = if disk_path.is_empty() { + disk_path_arg.required(true) + } else { + disk_path_arg.default_value(&disk_path) + }; + // build app let cmdline = App::new(LIMITADOR_HEADER) .version(full_version.as_str()) @@ -520,15 +553,33 @@ fn create_config() -> (Configuration, String) { .about("Counters are held in Limitador (ephemeral)"), ) .subcommand( - SubCommand::with_name("redis") + SubCommand::with_name("disk") .display_order(2) + .about("Counters are held on disk (persistent)") + .arg(disk_path_arg) + .arg( + Arg::with_name("OPTIMIZE") + .long("optimize") + .takes_value(true) + .display_order(1) + .default_value(&disk_optimize) + .value_parser(clap::builder::PossibleValuesParser::new([ + "throughput", + "disk", + ])) + .help("Optimizes either to save disk space or higher throughput"), + ), + ) + .subcommand( + SubCommand::with_name("redis") + .display_order(3) .about("Uses Redis to store counters") .arg(redis_url_arg.clone()), ) .subcommand( SubCommand::with_name("redis_cached") .about("Uses Redis to store counters, with an in-memory cache") - .display_order(3) + .display_order(4) .arg(redis_url_arg) .arg( Arg::with_name("TTL") @@ -572,7 +623,7 @@ fn create_config() -> (Configuration, String) { let cmdline = cmdline.subcommand( SubCommand::with_name("infinispan") .about("Uses Infinispan to store counters") - .display_order(4) + .display_order(5) .arg( Arg::with_name("URL") .help("Infinispan URL to use") @@ -648,6 +699,14 @@ fn create_config() -> (Configuration, String) { url: sub.value_of("URL").unwrap().to_owned(), cache: None, }), + Some(("disk", sub)) => StorageConfiguration::Disk(DiskStorageConfiguration { + path: sub.value_of("PATH").expect("We need a path!").to_string(), + optimization: match sub.value_of("OPTIMIZE") { + Some("disk") => storage::disk::OptimizeFor::Space, + Some("throughput") => storage::disk::OptimizeFor::Throughput, + _ => unreachable!("Some disk OptimizeFor wasn't configured!"), + }, + }), Some(("redis_cached", sub)) => StorageConfiguration::Redis(RedisStorageConfiguration { url: sub.value_of("URL").unwrap().to_owned(), cache: Some(RedisStorageCacheConfiguration { diff --git a/limitador/Cargo.toml b/limitador/Cargo.toml index 14d327d6..40757602 100644 --- a/limitador/Cargo.toml +++ b/limitador/Cargo.toml @@ -14,15 +14,18 @@ edition = "2021" # We make redis and infinispan optional to be able to compile for wasm32. [features] -default = ["redis_storage"] +default = ["disk_storage", "redis_storage"] +disk_storage = ["rocksdb"] redis_storage = ["redis", "r2d2", "tokio"] -infinispan_storage = ["infinispan", "reqwest"] +infinispan_storage = ["infinispan", "reqwest", "base64"] lenient_conditions = [] [dependencies] ttl_cache = "0.5" serde = { version = "1", features = ["derive"] } +postcard = { version = "1.0.4", features = ["use-std"] } serde_json = "1" +rmp-serde = "1.1.0" thiserror = "1" futures = "0.3" async-trait = "0.1" @@ -31,6 +34,7 @@ prometheus = "0.13" lazy_static = "1" # Optional dependencies +rocksdb = { version = "0.20.1", optional = true, features = ["multi-threaded-cf"] } redis = { version = "0.21", optional = true, features = [ "connection-manager", "tokio-comp", @@ -43,12 +47,14 @@ tokio = { version = "1", optional = true, features = [ ] } infinispan = { version = "0.3", optional = true } reqwest = { version = "0.11", optional = true } +base64 = { version = "0.21.0", optional = true } [dev-dependencies] serial_test = "0.9" criterion = { version = "0.4", features = ["html_reports"] } paste = "1" rand = "0.8" +tempdir = "0.3.7" tokio = { version = "1", features = [ "rt-multi-thread", "macros", diff --git a/limitador/benches/bench.rs b/limitador/benches/bench.rs index 3a9e22cb..e7d99f32 100644 --- a/limitador/benches/bench.rs +++ b/limitador/benches/bench.rs @@ -2,6 +2,9 @@ use criterion::{black_box, criterion_group, criterion_main, Bencher, BenchmarkId use rand::seq::SliceRandom; use limitador::limit::Limit; +#[cfg(feature = "disk_storage")] +use limitador::storage::disk::{DiskStorage, OptimizeFor}; +use limitador::storage::in_memory::InMemoryStorage; use limitador::storage::CounterStorage; use limitador::RateLimiter; use rand::SeedableRng; @@ -10,10 +13,14 @@ use std::fmt::{Display, Formatter}; const SEED: u64 = 42; -#[cfg(not(feature = "redis"))] +#[cfg(all(not(feature = "disk_storage"), not(feature = "redis_storage")))] criterion_group!(benches, bench_in_mem); -#[cfg(feature = "redis")] +#[cfg(all(feature = "disk_storage", not(feature = "redis_storage")))] +criterion_group!(benches, bench_in_mem, bench_disk); +#[cfg(all(not(feature = "disk_storage"), feature = "redis_storage"))] criterion_group!(benches, bench_in_mem, bench_redis); +#[cfg(all(feature = "disk_storage", feature = "redis_storage"))] +criterion_group!(benches, bench_in_mem, bench_disk, bench_redis); criterion_main!(benches); @@ -69,7 +76,7 @@ fn bench_in_mem(c: &mut Criterion) { BenchmarkId::new("is_rate_limited", scenario), scenario, |b: &mut Bencher, test_scenario: &&TestScenario| { - let storage = Box::::default(); + let storage = Box::::default(); bench_is_rate_limited(b, test_scenario, storage); }, ); @@ -77,7 +84,7 @@ fn bench_in_mem(c: &mut Criterion) { BenchmarkId::new("update_counters", scenario), scenario, |b: &mut Bencher, test_scenario: &&TestScenario| { - let storage = Box::::default(); + let storage = Box::::default(); bench_update_counters(b, test_scenario, storage); }, ); @@ -85,7 +92,7 @@ fn bench_in_mem(c: &mut Criterion) { BenchmarkId::new("check_rate_limited_and_update", scenario), scenario, |b: &mut Bencher, test_scenario: &&TestScenario| { - let storage = Box::::default(); + let storage = Box::::default(); bench_check_rate_limited_and_update(b, test_scenario, storage); }, ); @@ -93,7 +100,48 @@ fn bench_in_mem(c: &mut Criterion) { group.finish(); } -#[cfg(feature = "redis")] +#[cfg(feature = "disk_storage")] +fn bench_disk(c: &mut Criterion) { + let mut group = c.benchmark_group("Disk"); + for (index, scenario) in TEST_SCENARIOS.iter().enumerate() { + group.bench_with_input( + BenchmarkId::new("is_rate_limited", scenario), + scenario, + |b: &mut Bencher, test_scenario: &&TestScenario| { + let prefix = format!("limitador-disk-bench-{index}-is_rate_limited"); + let tmp = tempdir::TempDir::new(&prefix).expect("We should have a dir!"); + let storage = + Box::new(DiskStorage::open(tmp.path(), OptimizeFor::Throughput).unwrap()); + bench_is_rate_limited(b, test_scenario, storage); + }, + ); + group.bench_with_input( + BenchmarkId::new("update_counters", scenario), + scenario, + |b: &mut Bencher, test_scenario: &&TestScenario| { + let prefix = format!("limitador-disk-bench-{index}-update_counters"); + let tmp = tempdir::TempDir::new(&prefix).expect("We should have a dir!"); + let storage = + Box::new(DiskStorage::open(tmp.path(), OptimizeFor::Throughput).unwrap()); + bench_update_counters(b, test_scenario, storage); + }, + ); + group.bench_with_input( + BenchmarkId::new("check_rate_limited_and_update", scenario), + scenario, + |b: &mut Bencher, test_scenario: &&TestScenario| { + let prefix = format!("limitador-disk-bench-{index}-check_rate_limited_and_update"); + let tmp = tempdir::TempDir::new(&prefix).expect("We should have a dir!"); + let storage = + Box::new(DiskStorage::open(tmp.path(), OptimizeFor::Throughput).unwrap()); + bench_check_rate_limited_and_update(b, test_scenario, storage); + }, + ); + } + group.finish(); +} + +#[cfg(feature = "redis_storage")] fn bench_redis(c: &mut Criterion) { let mut group = c.benchmark_group("Redis"); for scenario in TEST_SCENARIOS { diff --git a/limitador/src/counter.rs b/limitador/src/counter.rs index f0850cfa..a83485ac 100644 --- a/limitador/src/counter.rs +++ b/limitador/src/counter.rs @@ -86,6 +86,16 @@ impl Counter { pub fn set_expires_in(&mut self, duration: Duration) { self.expires_in = Some(duration) } + + #[cfg(feature = "disk_storage")] + pub(crate) fn variables_for_key(&self) -> Vec<(&str, &str)> { + let mut variables = Vec::with_capacity(self.set_variables.len()); + for (var, value) in &self.set_variables { + variables.push((var.as_str(), value.as_str())); + } + variables.sort_by(|(key1, _), (key2, _)| key1.cmp(key2)); + variables + } } impl Hash for Counter { diff --git a/limitador/src/limit.rs b/limitador/src/limit.rs index 2c2ba6e6..db6e944a 100644 --- a/limitador/src/limit.rs +++ b/limitador/src/limit.rs @@ -50,7 +50,6 @@ impl From for Namespace { } #[derive(Eq, Debug, Clone, Serialize, Deserialize)] -#[serde(deny_unknown_fields)] pub struct Limit { namespace: Namespace, #[serde(skip_serializing, default)] @@ -367,6 +366,16 @@ impl Limit { self.variables.iter().map(|var| var.into()).collect() } + #[cfg(feature = "disk_storage")] + pub(crate) fn variables_for_key(&self) -> Vec<&str> { + let mut variables = Vec::with_capacity(self.variables.len()); + for var in &self.variables { + variables.push(var.as_str()); + } + variables.sort(); + variables + } + pub fn has_variable(&self, var: &str) -> bool { self.variables.contains(var) } diff --git a/limitador/src/storage/disk/expiring_value.rs b/limitador/src/storage/disk/expiring_value.rs new file mode 100644 index 00000000..cdfd5d5e --- /dev/null +++ b/limitador/src/storage/disk/expiring_value.rs @@ -0,0 +1,158 @@ +use crate::storage::StorageErr; +use std::array::TryFromSliceError; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +#[derive(Clone, Debug)] +pub(crate) struct ExpiringValue { + value: i64, + expiry: SystemTime, +} + +impl ExpiringValue { + pub fn new(value: i64, expiry: SystemTime) -> Self { + Self { value, expiry } + } + + pub fn value_at(&self, when: SystemTime) -> i64 { + if self.expiry <= when { + return 0; + } + self.value + } + + pub fn value(&self) -> i64 { + self.value_at(SystemTime::now()) + } + + pub fn update(self, delta: i64, ttl: u64, now: SystemTime) -> Self { + let expiry = if self.expiry <= now { + now + Duration::from_secs(ttl) + } else { + self.expiry + }; + + let value = self.value_at(now) + delta; + Self { value, expiry } + } + + pub fn merge(self, other: ExpiringValue, now: SystemTime) -> Self { + if self.expiry > now { + ExpiringValue { + value: self.value + other.value, + expiry: self.expiry, + } + } else { + other + } + } + + pub fn ttl(&self) -> Duration { + self.expiry + .duration_since(SystemTime::now()) + .unwrap_or(Duration::ZERO) + } +} + +impl Default for ExpiringValue { + fn default() -> Self { + ExpiringValue { + value: 0, + expiry: SystemTime::UNIX_EPOCH, + } + } +} + +impl TryFrom<&[u8]> for ExpiringValue { + type Error = TryFromSliceError; + + fn try_from(raw: &[u8]) -> Result { + let raw_val: [u8; 8] = raw[0..8].try_into()?; + let raw_exp: [u8; 8] = raw[8..16].try_into()?; + + let val = i64::from_be_bytes(raw_val); + let exp = u64::from_be_bytes(raw_exp); + + Ok(Self { + value: val, + expiry: UNIX_EPOCH + Duration::from_secs(exp), + }) + } +} + +impl From for Vec { + fn from(value: ExpiringValue) -> Self { + let val: [u8; 8] = value.value.to_be_bytes(); + let exp: [u8; 8] = value + .expiry + .duration_since(UNIX_EPOCH) + .expect("Can't expire before Epoch") + .as_secs() + .to_be_bytes(); + [val, exp].concat() + } +} + +impl From for StorageErr { + fn from(_: TryFromSliceError) -> Self { + Self { + msg: "Corrupted byte sequence while reading 8 bytes for 64-bit integer".to_owned(), + } + } +} + +#[cfg(test)] +mod tests { + use super::ExpiringValue; + use std::time::{Duration, SystemTime, UNIX_EPOCH}; + + #[test] + fn returns_value_when_valid() { + let now = SystemTime::now(); + let val = ExpiringValue::new(42, now); + assert_eq!(val.value_at(now - Duration::from_secs(1)), 42); + } + + #[test] + fn returns_default_when_expired() { + let now = SystemTime::now(); + let val = ExpiringValue::new(42, now - Duration::from_secs(1)); + assert_eq!(val.value_at(now), 0); + } + + #[test] + fn returns_default_on_expiry() { + let now = SystemTime::now(); + let val = ExpiringValue::new(42, now); + assert_eq!(val.value_at(now), 0); + } + + #[test] + fn updates_when_valid() { + let now = SystemTime::now(); + let val = ExpiringValue::new(42, now + Duration::from_secs(1)).update(3, 10, now); + assert_eq!(val.value_at(now - Duration::from_secs(1)), 45); + } + + #[test] + fn updates_when_expired() { + let now = SystemTime::now(); + let val = ExpiringValue::new(42, now); + assert_eq!(val.ttl(), Duration::ZERO); + let val = val.update(3, 10, now); + assert_eq!(val.value_at(now - Duration::from_secs(1)), 3); + } + + #[test] + fn from_into_vec() { + let now = SystemTime::now(); + let val = ExpiringValue::new(42, now); + let raw: Vec = val.into(); + let back: ExpiringValue = raw.as_slice().try_into().unwrap(); + + assert_eq!(back.value, 42); + assert_eq!( + back.expiry.duration_since(UNIX_EPOCH).unwrap().as_secs(), + now.duration_since(UNIX_EPOCH).unwrap().as_secs() + ); + } +} diff --git a/limitador/src/storage/disk/mod.rs b/limitador/src/storage/disk/mod.rs new file mode 100644 index 00000000..011f6e2a --- /dev/null +++ b/limitador/src/storage/disk/mod.rs @@ -0,0 +1,20 @@ +use crate::storage::StorageErr; + +mod expiring_value; +mod rocksdb_storage; + +pub use rocksdb_storage::RocksDbStorage as DiskStorage; + +impl From for StorageErr { + fn from(error: rocksdb::Error) -> Self { + Self { + msg: format!("Underlying storage error: {error}"), + } + } +} + +#[derive(PartialEq, Eq, Debug)] +pub enum OptimizeFor { + Space, + Throughput, +} diff --git a/limitador/src/storage/disk/rocksdb_storage.rs b/limitador/src/storage/disk/rocksdb_storage.rs new file mode 100644 index 00000000..d6b1c2f6 --- /dev/null +++ b/limitador/src/storage/disk/rocksdb_storage.rs @@ -0,0 +1,233 @@ +use crate::counter::Counter; +use crate::limit::Limit; +use crate::storage::disk::expiring_value::ExpiringValue; +use crate::storage::disk::OptimizeFor; +use crate::storage::keys::bin::{ + key_for_counter, partial_counter_from_counter_key, prefix_for_namespace, +}; +use crate::storage::{Authorization, CounterStorage, StorageErr}; +use rocksdb::{ + CompactionDecision, DBCompressionType, DBWithThreadMode, IteratorMode, MultiThreaded, Options, + DB, +}; +use std::collections::{BTreeSet, HashSet}; +use std::time::{Duration, SystemTime}; + +pub struct RocksDbStorage { + db: DBWithThreadMode, +} + +impl CounterStorage for RocksDbStorage { + fn is_within_limits(&self, counter: &Counter, delta: i64) -> Result { + let key = key_for_counter(counter); + let value = self.insert_or_update(&key, counter, 0)?; + Ok(counter.max_value() >= value.value() + delta) + } + + fn update_counter(&self, counter: &Counter, delta: i64) -> Result<(), StorageErr> { + let key = key_for_counter(counter); + self.insert_or_update(&key, counter, delta)?; + Ok(()) + } + + fn check_and_update( + &self, + counters: &mut Vec, + delta: i64, + load_counters: bool, + ) -> Result { + let mut keys: Vec> = Vec::with_capacity(counters.len()); + + for counter in &mut *counters { + let key = key_for_counter(counter); + let slice: &[u8] = key.as_ref(); + let (val, ttl) = match self.db.get(slice)? { + None => (0, Duration::from_secs(counter.limit().seconds())), + Some(raw) => { + let slice: &[u8] = raw.as_ref(); + let value: ExpiringValue = slice.try_into()?; + (value.value(), value.ttl()) + } + }; + + if load_counters { + counter.set_expires_in(ttl); + counter.set_remaining(counter.max_value() - val - delta); + } + + if counter.max_value() < val + delta { + return Ok(Authorization::Limited( + counter.limit().name().map(|n| n.to_string()), + )); + } + + keys.push(key); + } + + for (idx, counter) in counters.iter_mut().enumerate() { + self.insert_or_update(&keys[idx], counter, delta)?; + } + + Ok(Authorization::Ok) + } + + fn get_counters(&self, limits: &HashSet) -> Result, StorageErr> { + let mut counters = HashSet::default(); + let namepaces: BTreeSet<&str> = limits.iter().map(|l| l.namespace().as_ref()).collect(); + for ns in namepaces { + for entry in self.db.prefix_iterator(prefix_for_namespace(ns)) { + let (key, value) = entry?; + let mut counter = partial_counter_from_counter_key(key.as_ref()); + if counter.namespace().as_ref() != ns { + break; + } + let value: ExpiringValue = value.as_ref().try_into()?; + for limit in limits { + if limit == counter.limit() { + counter.update_to_limit(limit); + let ttl = value.ttl(); + counter.set_expires_in(ttl); + counter.set_remaining(limit.max_value() - value.value()); + break; + } + } + if counter.expires_in().expect("Duration needs to be set") > Duration::ZERO { + counters.insert(counter); + } + } + } + Ok(counters) + } + + fn delete_counters(&self, limits: HashSet) -> Result<(), StorageErr> { + let counters = self.get_counters(&limits)?; + for counter in &counters { + self.db.delete(key_for_counter(counter))?; + } + Ok(()) + } + + fn clear(&self) -> Result<(), StorageErr> { + for entry in self.db.iterator(IteratorMode::Start) { + self.db.delete(entry?.0)? + } + Ok(()) + } +} + +impl RocksDbStorage { + pub fn open>(path: P, mode: OptimizeFor) -> Result { + let mut opts = Options::default(); + match mode { + OptimizeFor::Space => { + opts.set_compression_type(DBCompressionType::Bz2); + opts.set_compaction_filter("ExpiredValueFilter", |_level, _key, value| { + if let Ok(value) = ExpiringValue::try_from(value) { + if value.value_at(SystemTime::now()) != 0 { + return CompactionDecision::Keep; + } + } + CompactionDecision::Remove + }); + } + OptimizeFor::Throughput => { + opts.set_compression_type(DBCompressionType::None); + } + } + opts.set_merge_operator_associative("ExpiringValueMerge", |_key, start, operands| { + let now = SystemTime::now(); + let mut value: ExpiringValue = start + .map(|raw: &[u8]| raw.try_into().unwrap_or_default()) + .unwrap_or_default(); + for op in operands { + // ignore (corrupted?) values pending merges + if let Ok(pending) = ExpiringValue::try_from(op) { + value = value.merge(pending, now); + } + } + Some(Vec::from(value)) + }); + opts.create_if_missing(true); + let db = DB::open(&opts, path).unwrap(); + Ok(Self { db }) + } + + fn insert_or_update( + &self, + key: &[u8], + counter: &Counter, + delta: i64, + ) -> Result { + let now = SystemTime::now(); + let value = match self.db.get(key)? { + None => ExpiringValue::default(), + Some(raw) => { + let slice: &[u8] = raw.as_ref(); + slice.try_into()? + } + }; + if value.value_at(now) + delta <= counter.max_value() { + let expiring_value = + ExpiringValue::new(delta, now + Duration::from_secs(counter.limit().seconds())); + self.db + .merge(key, >>::into(expiring_value))?; + return Ok(value.update(delta, counter.seconds(), now)); + } + Ok(value) + } +} + +#[cfg(test)] +mod tests { + use super::RocksDbStorage; + use crate::counter::Counter; + use crate::limit::Limit; + use crate::storage::disk::OptimizeFor; + use crate::storage::CounterStorage; + use std::collections::HashMap; + use std::fs; + use std::time::Duration; + use tempdir::TempDir; + + #[test] + fn opens_db_on_disk() { + let namespace = "test_namespace"; + let limit = Limit::new(namespace, 1, 2, vec!["req.method == 'GET'"], vec!["app_id"]); + let counter = Counter::new(limit, HashMap::default()); + + let tmp = TempDir::new("limitador-disk-tests").expect("We should have a dir!"); + { + let storage = RocksDbStorage::open(tmp.path(), OptimizeFor::Space) + .expect("We should have a storage"); + let mut files = fs::read_dir(tmp.as_ref()).expect("Couldn't access data dir"); + assert!(files.next().is_some()); + + assert!( + storage.is_within_limits(&counter, 1).unwrap(), + "Should be a fresh value" + ); + assert!( + storage.is_within_limits(&counter, 1).unwrap(), + "Should be from the store, yet still below threshold" + ); + std::thread::sleep(Duration::from_secs(2)); + assert!( + storage.update_counter(&counter, 1).is_ok(), + "Should have written the counter to disk" + ); + assert!( + !storage.is_within_limits(&counter, 1).unwrap(), + "Should now be above threshold!" + ); + } + + { + let storage = RocksDbStorage::open(tmp.path(), OptimizeFor::Space) + .expect("We should still have a storage"); + assert!( + !storage.is_within_limits(&counter, 1).unwrap(), + "Should be above threshold still!" + ); + } + } +} diff --git a/limitador/src/storage/keys.rs b/limitador/src/storage/keys.rs index c3498d10..51b257c2 100644 --- a/limitador/src/storage/keys.rs +++ b/limitador/src/storage/keys.rs @@ -17,8 +17,8 @@ use crate::limit::Limit; pub fn key_for_counter(counter: &Counter) -> String { format!( - "namespace:{{{}}},counter:{}", - counter.namespace().as_ref(), + "{},counter:{}", + prefix_for_namespace(counter.namespace().as_ref()), serde_json::to_string(counter).unwrap() ) } @@ -31,11 +31,12 @@ pub fn key_for_counters_of_limit(limit: &Limit) -> String { ) } -pub fn counter_from_counter_key(key: &str, limit: &Limit) -> Counter { - let counter_prefix = "counter:"; - let start_pos_counter = key.find(counter_prefix).unwrap() + counter_prefix.len(); +pub fn prefix_for_namespace(namespace: &str) -> String { + format!("namespace:{{{namespace}}},") +} - let mut counter: Counter = serde_json::from_str(&key[start_pos_counter..]).unwrap(); +pub fn counter_from_counter_key(key: &str, limit: &Limit) -> Counter { + let mut counter = partial_counter_from_counter_key(key, limit.namespace().as_ref()); if !counter.update_to_limit(limit) { // this means some kind of data corruption _or_ most probably // an out of sync `impl PartialEq for Limit` vs `pub fn key_for_counter(counter: &Counter) -> String` @@ -48,10 +49,23 @@ pub fn counter_from_counter_key(key: &str, limit: &Limit) -> Counter { counter } +pub fn partial_counter_from_counter_key(key: &str, namespace: &str) -> Counter { + let offset = ",counter:".len(); + let start_pos_counter = prefix_for_namespace(namespace).len() + offset; + + let counter: Counter = serde_json::from_str(&key[start_pos_counter..]).unwrap(); + counter +} + #[cfg(test)] mod tests { - use crate::storage::keys::key_for_counters_of_limit; + use super::{ + key_for_counter, key_for_counters_of_limit, partial_counter_from_counter_key, + prefix_for_namespace, + }; + use crate::counter::Counter; use crate::Limit; + use std::collections::HashMap; #[test] fn key_for_limit_format() { @@ -66,4 +80,157 @@ mod tests { "namespace:{example.com},counters_of_limit:{\"namespace\":\"example.com\",\"seconds\":60,\"conditions\":[\"req.method == \\\"GET\\\"\"],\"variables\":[\"app_id\"]}", key_for_counters_of_limit(&limit)) } + + #[test] + fn counter_key_and_counter_are_symmetric() { + let namespace = "ns_counter:"; + let limit = Limit::new(namespace, 1, 1, vec!["req.method == 'GET'"], vec!["app_id"]); + let counter = Counter::new(limit.clone(), HashMap::default()); + let raw = key_for_counter(&counter); + assert_eq!(counter, partial_counter_from_counter_key(&raw, namespace)); + let prefix = prefix_for_namespace(namespace); + assert_eq!(&raw[0..prefix.len()], &prefix); + } +} + +#[cfg(feature = "disk_storage")] +pub mod bin { + use serde::{Deserialize, Serialize}; + use std::collections::HashMap; + + use crate::counter::Counter; + use crate::limit::Limit; + + #[derive(PartialEq, Debug, Serialize, Deserialize)] + struct CounterKey<'a> { + ns: &'a str, + seconds: u64, + conditions: Vec, + variables: Vec<(&'a str, &'a str)>, + } + + impl<'a> From<&'a Counter> for CounterKey<'a> { + fn from(counter: &'a Counter) -> Self { + let set = counter.limit().conditions(); + let mut conditions = Vec::with_capacity(set.len()); + for cond in &set { + conditions.push(cond.clone()); + } + conditions.sort(); + + CounterKey { + ns: counter.namespace().as_ref(), + seconds: counter.seconds(), + conditions, + variables: counter.variables_for_key(), + } + } + } + + #[derive(PartialEq, Debug, Serialize, Deserialize)] + struct LimitCountersKey<'a> { + ns: &'a str, + seconds: u64, + conditions: Vec, + variables: Vec<&'a str>, + } + + impl<'a> From<&'a Limit> for LimitCountersKey<'a> { + fn from(limit: &'a Limit) -> Self { + let set = limit.conditions(); + let mut conditions = Vec::with_capacity(set.len()); + for cond in &set { + conditions.push(cond.clone()); + } + conditions.sort(); + + LimitCountersKey { + ns: limit.namespace().as_ref(), + seconds: limit.seconds(), + conditions, + variables: limit.variables_for_key(), + } + } + } + + pub fn key_for_counter(counter: &Counter) -> Vec { + let key: CounterKey = counter.into(); + postcard::to_stdvec(&key).unwrap() + } + + pub fn prefix_for_namespace(namespace: &str) -> Vec { + postcard::to_stdvec(namespace).unwrap() + } + + pub fn partial_counter_from_counter_key(key: &[u8]) -> Counter { + let key: CounterKey = postcard::from_bytes(key).unwrap(); + let CounterKey { + ns, + seconds, + conditions, + variables, + } = key; + + let map: HashMap = variables + .into_iter() + .map(|(var, value)| (var.to_string(), value.to_string())) + .collect(); + let limit = Limit::new(ns, i64::default(), seconds, conditions, map.keys()); + Counter::new(limit, map) + } + + #[cfg(test)] + mod tests { + use super::{ + key_for_counter, partial_counter_from_counter_key, prefix_for_namespace, CounterKey, + }; + use crate::counter::Counter; + use crate::Limit; + use std::collections::HashMap; + + #[test] + fn counter_key_serializes_and_back() { + let namespace = "ns_counter:"; + let limit = Limit::new( + namespace, + 1, + 2, + vec!["foo == 'bar'"], + vec!["app_id", "role", "wat"], + ); + let mut vars = HashMap::default(); + vars.insert("role".to_string(), "admin".to_string()); + vars.insert("app_id".to_string(), "123".to_string()); + vars.insert("wat".to_string(), "dunno".to_string()); + let counter = Counter::new(limit.clone(), vars); + + let raw = key_for_counter(&counter); + let key_back: CounterKey = + postcard::from_bytes(&raw).expect("This should deserialize back!"); + let key: CounterKey = (&counter).into(); + assert_eq!(key_back, key); + } + + #[test] + fn counter_key_and_counter_are_symmetric() { + let namespace = "ns_counter:"; + let limit = Limit::new(namespace, 1, 1, vec!["req.method == 'GET'"], vec!["app_id"]); + let mut variables = HashMap::default(); + variables.insert("app_id".to_string(), "123".to_string()); + let counter = Counter::new(limit.clone(), variables); + let raw = key_for_counter(&counter); + assert_eq!(counter, partial_counter_from_counter_key(&raw)); + } + + #[test] + fn counter_key_starts_with_namespace_prefix() { + let namespace = "ns_counter:"; + let limit = Limit::new(namespace, 1, 1, vec!["req.method == 'GET'"], vec!["app_id"]); + let counter = Counter::new(limit, HashMap::default()); + let serialized_counter = key_for_counter(&counter); + + let prefix = prefix_for_namespace(namespace); + assert_eq!(&serialized_counter[..prefix.len()], &prefix); + } + } } diff --git a/limitador/src/storage/mod.rs b/limitador/src/storage/mod.rs index e9751e79..25e97531 100644 --- a/limitador/src/storage/mod.rs +++ b/limitador/src/storage/mod.rs @@ -6,6 +6,8 @@ use std::collections::{HashMap, HashSet}; use std::sync::RwLock; use thiserror::Error; +#[cfg(feature = "disk_storage")] +pub mod disk; pub mod in_memory; pub mod wasm; @@ -15,7 +17,6 @@ pub mod redis; #[cfg(feature = "infinispan_storage")] pub mod infinispan; -#[cfg(any(feature = "redis_storage", feature = "infinispan_storage"))] mod keys; pub enum Authorization { diff --git a/limitador/tests/integration_tests.rs b/limitador/tests/integration_tests.rs index 5e34681f..ae0382bd 100644 --- a/limitador/tests/integration_tests.rs +++ b/limitador/tests/integration_tests.rs @@ -13,6 +13,14 @@ macro_rules! test_with_all_storage_impls { $function(&mut TestsLimiter::new_from_blocking_impl(rate_limiter)).await; } + #[tokio::test] + async fn [<$function _disk_storage>]() { + let dir = TempDir::new("limitador-disk-integration-tests").expect("We should have a dir!"); + let rate_limiter = + RateLimiter::new_with_storage(Box::new(DiskStorage::open(dir.path(), OptimizeFor::Throughput).expect("Couldn't open temp dir"))); + $function(&mut TestsLimiter::new_from_blocking_impl(rate_limiter)).await; + } + #[cfg(feature = "redis_storage")] #[tokio::test] #[serial] @@ -94,11 +102,13 @@ mod test { use self::limitador::RateLimiter; use crate::helpers::tests_limiter::*; use limitador::limit::Limit; + use limitador::storage::disk::{DiskStorage, OptimizeFor}; use limitador::storage::in_memory::InMemoryStorage; use limitador::storage::wasm::WasmStorage; use std::collections::{HashMap, HashSet}; use std::thread::sleep; use std::time::{Duration, SystemTime}; + use tempdir::TempDir; // This is only needed for the WASM-compatible storage. pub struct TestClock {} @@ -432,11 +442,14 @@ mod test { values.insert("req.method".to_string(), "GET".to_string()); values.insert("app_id".to_string(), "test_app_id".to_string()); - for _ in 0..max_hits { - assert!(!rate_limiter - .is_rate_limited(namespace, &values, 1) - .await - .unwrap()); + for i in 0..max_hits { + assert!( + !rate_limiter + .is_rate_limited(namespace, &values, 1) + .await + .unwrap(), + "Must not be limited after {i}" + ); rate_limiter .update_counters(namespace, &values, 1) .await @@ -829,7 +842,7 @@ mod test { let limit_time = 1; let limit = Limit::new( - "test_namespace", + namespace, 10, limit_time, vec!["req.method == 'GET'"], @@ -849,11 +862,7 @@ mod test { // Give it some extra time to expire sleep(Duration::from_secs(limit_time + 1)); - assert!(rate_limiter - .get_counters(namespace) - .await - .unwrap() - .is_empty()); + assert_eq!(rate_limiter.get_counters(namespace).await.unwrap().len(), 0); } async fn configure_with_creates_the_given_limits(rate_limiter: &mut TestsLimiter) {