Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Crdt #324

Merged
merged 10 commits into from
May 17, 2024
Merged

Crdt #324

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions limitador-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ documentation = "https://kuadrant.io/docs/limitador"
readme = "README.md"
edition = "2021"

[features]
distributed_storage = ["limitador/distributed_storage"]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
Expand Down
9 changes: 9 additions & 0 deletions limitador-server/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::process::Command;
fn main() -> Result<(), Box<dyn Error>> {
set_git_hash("LIMITADOR_GIT_HASH");
set_profile("LIMITADOR_PROFILE");
set_features("LIMITADOR_FEATURES");
generate_protobuf()
}

Expand All @@ -31,6 +32,14 @@ fn set_profile(env: &str) {
}
}

fn set_features(env: &str) {
let mut features = vec![];
if cfg!(feature = "distributed_storage") {
features.push("+distributed");
}
println!("cargo:rustc-env={env}={features:?}");
}

fn set_git_hash(env: &str) {
let git_sha = Command::new("/usr/bin/git")
.args(["rev-parse", "HEAD"])
Expand Down
4 changes: 1 addition & 3 deletions limitador-server/examples/limits.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
---
-
namespace: test_namespace
max_value: 10
max_value: 1000000
seconds: 60
conditions:
- "req.method == 'GET'"
variables:
- user_id
-
namespace: test_namespace
max_value: 5
Expand Down
11 changes: 11 additions & 0 deletions limitador-server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,24 @@ pub enum StorageConfiguration {
InMemory(InMemoryStorageConfiguration),
Disk(DiskStorageConfiguration),
Redis(RedisStorageConfiguration),
#[cfg(feature = "distributed_storage")]
Distributed(DistributedStorageConfiguration),
}

#[derive(PartialEq, Eq, Debug)]
pub struct InMemoryStorageConfiguration {
pub cache_size: Option<u64>,
}

#[derive(PartialEq, Eq, Debug)]
#[cfg(feature = "distributed_storage")]
pub struct DistributedStorageConfiguration {
pub name: String,
pub cache_size: Option<u64>,
pub local: String,
pub broadcast: String,
}

#[derive(PartialEq, Eq, Debug)]
pub struct DiskStorageConfiguration {
pub path: String,
Expand Down
71 changes: 69 additions & 2 deletions limitador-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
extern crate log;
extern crate clap;

#[cfg(feature = "distributed_storage")]
use crate::config::DistributedStorageConfiguration;
use crate::config::{
Configuration, DiskStorageConfiguration, InMemoryStorageConfiguration,
RedisStorageCacheConfiguration, RedisStorageConfiguration, StorageConfiguration,
Expand All @@ -23,6 +25,8 @@ use limitador::storage::redis::{
AsyncRedisStorage, CachedRedisStorage, CachedRedisStorageBuilder, DEFAULT_BATCH_SIZE,
DEFAULT_FLUSHING_PERIOD_SEC, DEFAULT_MAX_CACHED_COUNTERS, DEFAULT_RESPONSE_TIMEOUT_MS,
};
#[cfg(feature = "distributed_storage")]
use limitador::storage::DistributedInMemoryStorage;
use limitador::storage::{AsyncCounterStorage, AsyncStorage, Storage};
use limitador::{
storage, AsyncRateLimiter, AsyncRateLimiterBuilder, RateLimiter, RateLimiterBuilder,
Expand Down Expand Up @@ -57,6 +61,7 @@ pub mod prometheus_metrics;

const LIMITADOR_VERSION: &str = env!("CARGO_PKG_VERSION");
const LIMITADOR_PROFILE: &str = env!("LIMITADOR_PROFILE");
const LIMITADOR_FEATURES: &str = env!("LIMITADOR_FEATURES");
const LIMITADOR_HEADER: &str = "Limitador Server";

#[derive(Error, Debug)]
Expand All @@ -83,6 +88,8 @@ impl Limiter {
let rate_limiter = match config.storage {
StorageConfiguration::Redis(cfg) => Self::redis_limiter(cfg).await,
StorageConfiguration::InMemory(cfg) => Self::in_memory_limiter(cfg),
#[cfg(feature = "distributed_storage")]
StorageConfiguration::Distributed(cfg) => Self::distributed_limiter(cfg),
StorageConfiguration::Disk(cfg) => Self::disk_limiter(cfg),
};

Expand Down Expand Up @@ -154,6 +161,20 @@ impl Limiter {
Self::Blocking(rate_limiter_builder.build())
}

#[cfg(feature = "distributed_storage")]
fn distributed_limiter(cfg: DistributedStorageConfiguration) -> Self {
let storage = DistributedInMemoryStorage::new(
cfg.name,
cfg.cache_size.or_else(guess_cache_size).unwrap(),
cfg.local,
cfg.broadcast,
);
let rate_limiter_builder =
RateLimiterBuilder::with_storage(Storage::with_counter_storage(Box::new(storage)));

Self::Blocking(rate_limiter_builder.build())
}

pub async fn load_limits_from_file<P: AsRef<Path>>(
&self,
path: &P,
Expand Down Expand Up @@ -350,12 +371,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

fn create_config() -> (Configuration, &'static str) {
let full_version: &'static str = formatcp!(
"v{} ({}) {}",
"v{} ({}) {} {}",
LIMITADOR_VERSION,
env!("LIMITADOR_GIT_HASH"),
LIMITADOR_FEATURES,
LIMITADOR_PROFILE,
);

// wire args based of defaults
let limit_arg = Arg::new("LIMITS_FILE")
.action(ArgAction::Set)
Expand Down Expand Up @@ -565,6 +586,43 @@ fn create_config() -> (Configuration, &'static str) {
),
);

#[cfg(feature = "distributed_storage")]
let cmdline = cmdline.subcommand(
Command::new("distributed")
.about("Replicates CRDT-based counters across multiple Limitador servers")
.display_order(5)
.arg(
Arg::new("NAME")
.action(ArgAction::Set)
.required(true)
.display_order(2)
.help("Unique name to identify this Limitador instance"),
)
.arg(
Arg::new("LOCAL")
.action(ArgAction::Set)
.required(true)
.display_order(2)
.help("Local IP:PORT to send datagrams from"),
)
.arg(
Arg::new("BROADCAST")
.action(ArgAction::Set)
.required(true)
.display_order(3)
.help("Broadcast IP:PORT to send datagrams to"),
)
.arg(
Arg::new("CACHE_SIZE")
.long("cache")
.short('c')
.action(ArgAction::Set)
.value_parser(value_parser!(u64))
.display_order(4)
.help("Sets the size of the cache for 'qualified counters'"),
),
);

let matches = cmdline.get_matches();

let limits_file = matches.get_one::<String>("LIMITS_FILE").unwrap();
Expand Down Expand Up @@ -630,6 +688,15 @@ fn create_config() -> (Configuration, &'static str) {
Some(("memory", sub)) => StorageConfiguration::InMemory(InMemoryStorageConfiguration {
cache_size: sub.get_one::<u64>("CACHE_SIZE").copied(),
}),
#[cfg(feature = "distributed_storage")]
Some(("distributed", sub)) => {
StorageConfiguration::Distributed(DistributedStorageConfiguration {
name: sub.get_one::<String>("NAME").unwrap().to_owned(),
local: sub.get_one::<String>("LOCAL").unwrap().to_owned(),
broadcast: sub.get_one::<String>("BROADCAST").unwrap().to_owned(),
cache_size: sub.get_one::<u64>("CACHE_SIZE").copied(),
})
}
None => storage_config_from_env(),
_ => unreachable!("Some storage wasn't configured!"),
};
Expand Down
1 change: 1 addition & 0 deletions limitador/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ edition = "2021"
[features]
default = ["disk_storage", "redis_storage"]
disk_storage = ["rocksdb"]
distributed_storage = []
redis_storage = ["redis", "r2d2", "tokio"]
lenient_conditions = []

Expand Down
47 changes: 47 additions & 0 deletions limitador/src/storage/atomic_expiring_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,47 @@ impl AtomicExpiryTime {
}
false
}

#[allow(dead_code)]
pub fn merge(&self, other: Self) {
let mut other = other;
loop {
let now = SystemTime::now();
other = match self.merge_at(other, now) {
Ok(_) => return,
Err(other) => other,
};
}
}

pub fn merge_at(&self, other: Self, when: SystemTime) -> Result<(), Self> {
let other_exp = other.expiry.load(Ordering::SeqCst);
let expiry = self.expiry.load(Ordering::SeqCst);
if other_exp < expiry && other_exp > Self::since_epoch(when) {
// if our expiry changed, some thread observed the time window as elapsed...
// `other` can't be in the future anymore! Safely ignoring the failure scenario
return match self.expiry.compare_exchange(
expiry,
other_exp,
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => Ok(()),
Err(_) => Err(other),
};
}
Ok(())
}

#[allow(dead_code)]
pub fn into_inner(self) -> SystemTime {
self.expires_at()
}

#[allow(dead_code)]
pub fn expires_at(&self) -> SystemTime {
SystemTime::UNIX_EPOCH + Duration::from_micros(self.expiry.load(Ordering::SeqCst))
}
}

impl Clone for AtomicExpiryTime {
Expand Down Expand Up @@ -130,6 +171,12 @@ impl Clone for AtomicExpiringValue {
}
}

impl From<SystemTime> for AtomicExpiryTime {
fn from(value: SystemTime) -> Self {
AtomicExpiryTime::new(value)
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading
Loading