Skip to content

Commit

Permalink
Merge pull request #178 from Kuadrant/rocksdb-full
Browse files Browse the repository at this point in the history
Disk storage using RocksDB
  • Loading branch information
alexsnaps authored May 17, 2023
2 parents 8d89846 + 2b635e2 commit dd76789
Show file tree
Hide file tree
Showing 14 changed files with 1,090 additions and 43 deletions.
333 changes: 326 additions & 7 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ RUN dnf -y --setopt=install_weak_deps=False --setopt=tsflags=nodocs install \
&& dnf -y --setopt=install_weak_deps=False --setopt=tsflags=nodocs install epel-release \
&& dnf config-manager --set-enabled powertools

RUN PKGS="gcc-c++ gcc-toolset-12-binutils-gold openssl3-devel protobuf-c protobuf-devel git" \
RUN PKGS="gcc-c++ gcc-toolset-12-binutils-gold openssl3-devel protobuf-c protobuf-devel git clang kernel-headers" \
&& dnf install --nodocs --assumeyes $PKGS \
&& rpm --verify --nogroup --nouser $PKGS \
&& yum -y clean all
Expand All @@ -39,7 +39,7 @@ RUN source $HOME/.cargo/env \
FROM registry.access.redhat.com/ubi8/ubi-minimal:8.7

# shadow-utils is required for `useradd`
RUN PKGS="libgcc shadow-utils" \
RUN PKGS="libgcc libstdc++ shadow-utils" \
&& microdnf --assumeyes install --nodocs $PKGS \
&& rpm --verify --nogroup --nouser $PKGS \
&& microdnf -y clean all
Expand Down
8 changes: 8 additions & 0 deletions limitador-server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
// HTTP_API_PORT: port

use crate::envoy_rls::server::RateLimitHeaders;
use limitador::storage;
use log::LevelFilter;

#[derive(Debug)]
Expand Down Expand Up @@ -92,11 +93,18 @@ impl Default for Configuration {
#[derive(PartialEq, Eq, Debug)]
pub enum StorageConfiguration {
InMemory,
Disk(DiskStorageConfiguration),
Redis(RedisStorageConfiguration),
#[cfg(feature = "infinispan")]
Infinispan(InfinispanStorageConfiguration),
}

#[derive(PartialEq, Eq, Debug)]
pub struct DiskStorageConfiguration {
pub path: String,
pub optimization: storage::disk::OptimizeFor,
}

#[derive(PartialEq, Eq, Debug)]
pub struct RedisStorageConfiguration {
pub url: String,
Expand Down
71 changes: 65 additions & 6 deletions limitador-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@ extern crate clap;
#[cfg(feature = "infinispan")]
use crate::config::InfinispanStorageConfiguration;
use crate::config::{
Configuration, RedisStorageCacheConfiguration, RedisStorageConfiguration, StorageConfiguration,
Configuration, DiskStorageConfiguration, RedisStorageCacheConfiguration,
RedisStorageConfiguration, StorageConfiguration,
};
use crate::envoy_rls::server::{run_envoy_rls_server, RateLimitHeaders};
use crate::http_api::server::run_http_server;
use clap::{App, Arg, SubCommand};
use env_logger::Builder;
use limitador::errors::LimitadorError;
use limitador::limit::Limit;
use limitador::storage::disk::DiskStorage;
#[cfg(feature = "infinispan")]
use limitador::storage::infinispan::{Consistency, InfinispanStorageBuilder};
#[cfg(feature = "infinispan")]
Expand All @@ -26,8 +28,10 @@ use limitador::storage::redis::{
DEFAULT_MAX_CACHED_COUNTERS, DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC,
DEFAULT_TTL_RATIO_CACHED_COUNTERS,
};
use limitador::storage::{AsyncCounterStorage, AsyncStorage};
use limitador::{AsyncRateLimiter, AsyncRateLimiterBuilder, RateLimiter, RateLimiterBuilder};
use limitador::storage::{AsyncCounterStorage, AsyncStorage, Storage};
use limitador::{
storage, AsyncRateLimiter, AsyncRateLimiterBuilder, RateLimiter, RateLimiterBuilder,
};
use log::LevelFilter;
use notify::event::{ModifyKind, RenameMode};
use notify::{Error, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
Expand Down Expand Up @@ -82,6 +86,7 @@ impl Limiter {
Self::infinispan_limiter(cfg, config.limit_name_in_labels).await
}
StorageConfiguration::InMemory => Self::in_memory_limiter(config),
StorageConfiguration::Disk(cfg) => Self::disk_limiter(cfg, config.limit_name_in_labels),
};

Ok(rate_limiter)
Expand Down Expand Up @@ -199,6 +204,24 @@ impl Limiter {
Self::Async(rate_limiter_builder.build())
}

fn disk_limiter(cfg: DiskStorageConfiguration, limit_name_in_labels: bool) -> Self {
let storage = match DiskStorage::open(cfg.path.as_str(), cfg.optimization) {
Ok(storage) => storage,
Err(err) => {
eprintln!("Failed to open DB at {}: {err}", cfg.path);
process::exit(1)
}
};
let mut rate_limiter_builder =
RateLimiterBuilder::new().storage(Storage::with_counter_storage(Box::new(storage)));

if limit_name_in_labels {
rate_limiter_builder = rate_limiter_builder.with_prometheus_limit_name_labels()
}

Self::Blocking(rate_limiter_builder.build())
}

fn in_memory_limiter(cfg: Configuration) -> Self {
let mut rate_limiter_builder = RateLimiterBuilder::new();

Expand Down Expand Up @@ -402,6 +425,9 @@ fn create_config() -> (Configuration, String) {
let default_http_port =
env::var("HTTP_API_PORT").unwrap_or_else(|_| Configuration::DEFAULT_HTTP_PORT.to_string());

let disk_path = env::var("DISK_PATH").unwrap_or_else(|_| "".to_string());
let disk_optimize = env::var("DISK_OPTIMIZE").unwrap_or_else(|_| "throughput".to_string());

let redis_url = env::var("REDIS_URL").unwrap_or_else(|_| "".to_string());

let redis_cached_ttl_default = env::var("REDIS_LOCAL_CACHE_MAX_TTL_CACHED_COUNTERS_MS")
Expand Down Expand Up @@ -438,6 +464,13 @@ fn create_config() -> (Configuration, String) {
redis_url_arg.default_value(&redis_url)
};

let disk_path_arg = Arg::with_name("PATH").help("Path to counter DB").index(1);
let disk_path_arg = if disk_path.is_empty() {
disk_path_arg.required(true)
} else {
disk_path_arg.default_value(&disk_path)
};

// build app
let cmdline = App::new(LIMITADOR_HEADER)
.version(full_version.as_str())
Expand Down Expand Up @@ -520,15 +553,33 @@ fn create_config() -> (Configuration, String) {
.about("Counters are held in Limitador (ephemeral)"),
)
.subcommand(
SubCommand::with_name("redis")
SubCommand::with_name("disk")
.display_order(2)
.about("Counters are held on disk (persistent)")
.arg(disk_path_arg)
.arg(
Arg::with_name("OPTIMIZE")
.long("optimize")
.takes_value(true)
.display_order(1)
.default_value(&disk_optimize)
.value_parser(clap::builder::PossibleValuesParser::new([
"throughput",
"disk",
]))
.help("Optimizes either to save disk space or higher throughput"),
),
)
.subcommand(
SubCommand::with_name("redis")
.display_order(3)
.about("Uses Redis to store counters")
.arg(redis_url_arg.clone()),
)
.subcommand(
SubCommand::with_name("redis_cached")
.about("Uses Redis to store counters, with an in-memory cache")
.display_order(3)
.display_order(4)
.arg(redis_url_arg)
.arg(
Arg::with_name("TTL")
Expand Down Expand Up @@ -572,7 +623,7 @@ fn create_config() -> (Configuration, String) {
let cmdline = cmdline.subcommand(
SubCommand::with_name("infinispan")
.about("Uses Infinispan to store counters")
.display_order(4)
.display_order(5)
.arg(
Arg::with_name("URL")
.help("Infinispan URL to use")
Expand Down Expand Up @@ -648,6 +699,14 @@ fn create_config() -> (Configuration, String) {
url: sub.value_of("URL").unwrap().to_owned(),
cache: None,
}),
Some(("disk", sub)) => StorageConfiguration::Disk(DiskStorageConfiguration {
path: sub.value_of("PATH").expect("We need a path!").to_string(),
optimization: match sub.value_of("OPTIMIZE") {
Some("disk") => storage::disk::OptimizeFor::Space,
Some("throughput") => storage::disk::OptimizeFor::Throughput,
_ => unreachable!("Some disk OptimizeFor wasn't configured!"),
},
}),
Some(("redis_cached", sub)) => StorageConfiguration::Redis(RedisStorageConfiguration {
url: sub.value_of("URL").unwrap().to_owned(),
cache: Some(RedisStorageCacheConfiguration {
Expand Down
10 changes: 8 additions & 2 deletions limitador/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,18 @@ edition = "2021"

# We make redis and infinispan optional to be able to compile for wasm32.
[features]
default = ["redis_storage"]
default = ["disk_storage", "redis_storage"]
disk_storage = ["rocksdb"]
redis_storage = ["redis", "r2d2", "tokio"]
infinispan_storage = ["infinispan", "reqwest"]
infinispan_storage = ["infinispan", "reqwest", "base64"]
lenient_conditions = []

[dependencies]
ttl_cache = "0.5"
serde = { version = "1", features = ["derive"] }
postcard = { version = "1.0.4", features = ["use-std"] }
serde_json = "1"
rmp-serde = "1.1.0"
thiserror = "1"
futures = "0.3"
async-trait = "0.1"
Expand All @@ -31,6 +34,7 @@ prometheus = "0.13"
lazy_static = "1"

# Optional dependencies
rocksdb = { version = "0.20.1", optional = true, features = ["multi-threaded-cf"] }
redis = { version = "0.21", optional = true, features = [
"connection-manager",
"tokio-comp",
Expand All @@ -43,12 +47,14 @@ tokio = { version = "1", optional = true, features = [
] }
infinispan = { version = "0.3", optional = true }
reqwest = { version = "0.11", optional = true }
base64 = { version = "0.21.0", optional = true }

[dev-dependencies]
serial_test = "0.9"
criterion = { version = "0.4", features = ["html_reports"] }
paste = "1"
rand = "0.8"
tempdir = "0.3.7"
tokio = { version = "1", features = [
"rt-multi-thread",
"macros",
Expand Down
60 changes: 54 additions & 6 deletions limitador/benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ use criterion::{black_box, criterion_group, criterion_main, Bencher, BenchmarkId
use rand::seq::SliceRandom;

use limitador::limit::Limit;
#[cfg(feature = "disk_storage")]
use limitador::storage::disk::{DiskStorage, OptimizeFor};
use limitador::storage::in_memory::InMemoryStorage;
use limitador::storage::CounterStorage;
use limitador::RateLimiter;
use rand::SeedableRng;
Expand All @@ -10,10 +13,14 @@ use std::fmt::{Display, Formatter};

const SEED: u64 = 42;

#[cfg(not(feature = "redis"))]
#[cfg(all(not(feature = "disk_storage"), not(feature = "redis_storage")))]
criterion_group!(benches, bench_in_mem);
#[cfg(feature = "redis")]
#[cfg(all(feature = "disk_storage", not(feature = "redis_storage")))]
criterion_group!(benches, bench_in_mem, bench_disk);
#[cfg(all(not(feature = "disk_storage"), feature = "redis_storage"))]
criterion_group!(benches, bench_in_mem, bench_redis);
#[cfg(all(feature = "disk_storage", feature = "redis_storage"))]
criterion_group!(benches, bench_in_mem, bench_disk, bench_redis);

criterion_main!(benches);

Expand Down Expand Up @@ -69,31 +76,72 @@ fn bench_in_mem(c: &mut Criterion) {
BenchmarkId::new("is_rate_limited", scenario),
scenario,
|b: &mut Bencher, test_scenario: &&TestScenario| {
let storage = Box::<limitador::storage::in_memory::InMemoryStorage>::default();
let storage = Box::<InMemoryStorage>::default();
bench_is_rate_limited(b, test_scenario, storage);
},
);
group.bench_with_input(
BenchmarkId::new("update_counters", scenario),
scenario,
|b: &mut Bencher, test_scenario: &&TestScenario| {
let storage = Box::<limitador::storage::in_memory::InMemoryStorage>::default();
let storage = Box::<InMemoryStorage>::default();
bench_update_counters(b, test_scenario, storage);
},
);
group.bench_with_input(
BenchmarkId::new("check_rate_limited_and_update", scenario),
scenario,
|b: &mut Bencher, test_scenario: &&TestScenario| {
let storage = Box::<limitador::storage::in_memory::InMemoryStorage>::default();
let storage = Box::<InMemoryStorage>::default();
bench_check_rate_limited_and_update(b, test_scenario, storage);
},
);
}
group.finish();
}

#[cfg(feature = "redis")]
#[cfg(feature = "disk_storage")]
fn bench_disk(c: &mut Criterion) {
let mut group = c.benchmark_group("Disk");
for (index, scenario) in TEST_SCENARIOS.iter().enumerate() {
group.bench_with_input(
BenchmarkId::new("is_rate_limited", scenario),
scenario,
|b: &mut Bencher, test_scenario: &&TestScenario| {
let prefix = format!("limitador-disk-bench-{index}-is_rate_limited");
let tmp = tempdir::TempDir::new(&prefix).expect("We should have a dir!");
let storage =
Box::new(DiskStorage::open(tmp.path(), OptimizeFor::Throughput).unwrap());
bench_is_rate_limited(b, test_scenario, storage);
},
);
group.bench_with_input(
BenchmarkId::new("update_counters", scenario),
scenario,
|b: &mut Bencher, test_scenario: &&TestScenario| {
let prefix = format!("limitador-disk-bench-{index}-update_counters");
let tmp = tempdir::TempDir::new(&prefix).expect("We should have a dir!");
let storage =
Box::new(DiskStorage::open(tmp.path(), OptimizeFor::Throughput).unwrap());
bench_update_counters(b, test_scenario, storage);
},
);
group.bench_with_input(
BenchmarkId::new("check_rate_limited_and_update", scenario),
scenario,
|b: &mut Bencher, test_scenario: &&TestScenario| {
let prefix = format!("limitador-disk-bench-{index}-check_rate_limited_and_update");
let tmp = tempdir::TempDir::new(&prefix).expect("We should have a dir!");
let storage =
Box::new(DiskStorage::open(tmp.path(), OptimizeFor::Throughput).unwrap());
bench_check_rate_limited_and_update(b, test_scenario, storage);
},
);
}
group.finish();
}

#[cfg(feature = "redis_storage")]
fn bench_redis(c: &mut Criterion) {
let mut group = c.benchmark_group("Redis");
for scenario in TEST_SCENARIOS {
Expand Down
10 changes: 10 additions & 0 deletions limitador/src/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,16 @@ impl Counter {
pub fn set_expires_in(&mut self, duration: Duration) {
self.expires_in = Some(duration)
}

#[cfg(feature = "disk_storage")]
pub(crate) fn variables_for_key(&self) -> Vec<(&str, &str)> {
let mut variables = Vec::with_capacity(self.set_variables.len());
for (var, value) in &self.set_variables {
variables.push((var.as_str(), value.as_str()));
}
variables.sort_by(|(key1, _), (key2, _)| key1.cmp(key2));
variables
}
}

impl Hash for Counter {
Expand Down
11 changes: 10 additions & 1 deletion limitador/src/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ impl From<String> for Namespace {
}

#[derive(Eq, Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct Limit {
namespace: Namespace,
#[serde(skip_serializing, default)]
Expand Down Expand Up @@ -367,6 +366,16 @@ impl Limit {
self.variables.iter().map(|var| var.into()).collect()
}

#[cfg(feature = "disk_storage")]
pub(crate) fn variables_for_key(&self) -> Vec<&str> {
let mut variables = Vec::with_capacity(self.variables.len());
for var in &self.variables {
variables.push(var.as_str());
}
variables.sort();
variables
}

pub fn has_variable(&self, var: &str) -> bool {
self.variables.contains(var)
}
Expand Down
Loading

0 comments on commit dd76789

Please sign in to comment.