Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🔥 Sled and use RocksDB instead #170

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
236 changes: 165 additions & 71 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ ENV CARGO_NET_GIT_FETCH_WITH_CLI=true
ARG RUSTC_VERSION=1.67.1
RUN apk update \
&& apk upgrade \
&& apk add build-base binutils-gold openssl3-dev protoc protobuf-dev curl git \
&& apk add build-base binutils-gold openssl3-dev protoc protobuf-dev curl git linux-headers clang \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there may be issues with the new ubi8 based images #168

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally will be...

&& curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- --no-modify-path --profile minimal --default-toolchain ${RUSTC_VERSION} -c rustfmt -y

WORKDIR /usr/src/limitador
Expand All @@ -28,7 +28,7 @@ RUN source $HOME/.cargo/env \

FROM alpine:3.16

RUN apk add libgcc
RUN apk add libgcc libstdc++

RUN addgroup -g 1000 limitador \
&& adduser -D -s /bin/sh -u 1000 -G limitador limitador
Expand Down
5 changes: 3 additions & 2 deletions limitador/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ edition = "2021"

# We make redis and infinispan optional to be able to compile for wasm32.
[features]
default = ["redis_storage"]
default = ["disk_storage", "redis_storage"]
disk_storage = ["rocksdb"]
redis_storage = ["redis", "r2d2", "tokio"]
infinispan_storage = ["infinispan", "reqwest", "base64"]
lenient_conditions = []
Expand All @@ -31,9 +32,9 @@ async-trait = "0.1"
cfg-if = "1"
prometheus = "0.13"
lazy_static = "1"
sled = "0.34.7"

# Optional dependencies
rocksdb = { version = "0.20.1", optional = true, features = ["multi-threaded-cf"] }
redis = { version = "0.21", optional = true, features = [
"connection-manager",
"tokio-comp",
Expand Down
8 changes: 4 additions & 4 deletions limitador/benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ use tempdir::TempDir;
const SEED: u64 = 42;

#[cfg(not(feature = "redis"))]
criterion_group!(benches, bench_in_mem, bench_sled);
criterion_group!(benches, bench_in_mem, bench_disk);
#[cfg(feature = "redis")]
criterion_group!(benches, bench_in_mem, bench_sled, bench_redis);
criterion_group!(benches, bench_in_mem, bench_disk, bench_redis);

criterion_main!(benches);

Expand Down Expand Up @@ -96,8 +96,8 @@ fn bench_in_mem(c: &mut Criterion) {
group.finish();
}

fn bench_sled(c: &mut Criterion) {
let mut group = c.benchmark_group("Sled");
fn bench_disk(c: &mut Criterion) {
let mut group = c.benchmark_group("Disk");
for (index, scenario) in TEST_SCENARIOS.iter().enumerate() {
group.bench_with_input(
BenchmarkId::new("is_rate_limited", scenario),
Expand Down
43 changes: 30 additions & 13 deletions limitador/src/storage/disk/expiring_value.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::storage::StorageErr;
use sled::IVec;
use std::array::TryFromSliceError;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

pub struct ExpiringValue {
#[derive(Clone, Debug)]
pub(crate) struct ExpiringValue {
value: i64,
expiry: SystemTime,
}
Expand All @@ -24,9 +24,7 @@ impl ExpiringValue {
self.value_at(SystemTime::now())
}

pub fn update(self, delta: i64, ttl: u64) -> Self {
let now = SystemTime::now();

pub fn update(self, delta: i64, ttl: u64, now: SystemTime) -> Self {
let expiry = if self.expiry <= now {
now + Duration::from_secs(ttl)
} else {
Expand All @@ -37,13 +35,33 @@ impl ExpiringValue {
Self { value, expiry }
}

pub fn merge(self, other: ExpiringValue, now: SystemTime) -> Self {
if self.expiry > now {
ExpiringValue {
value: self.value + other.value,
expiry: self.expiry,
}
} else {
other
}
}

pub fn ttl(&self) -> Duration {
self.expiry
.duration_since(SystemTime::now())
.unwrap_or(Duration::ZERO)
}
}

impl Default for ExpiringValue {
fn default() -> Self {
ExpiringValue {
value: 0,
expiry: SystemTime::UNIX_EPOCH,
}
}
}

impl TryFrom<&[u8]> for ExpiringValue {
type Error = TryFromSliceError;

Expand All @@ -61,7 +79,7 @@ impl TryFrom<&[u8]> for ExpiringValue {
}
}

impl From<ExpiringValue> for IVec {
impl From<ExpiringValue> for Vec<u8> {
fn from(value: ExpiringValue) -> Self {
let val: [u8; 8] = value.value.to_be_bytes();
let exp: [u8; 8] = value
Expand All @@ -70,7 +88,7 @@ impl From<ExpiringValue> for IVec {
.expect("Can't expire before Epoch")
.as_secs()
.to_be_bytes();
IVec::from([val, exp].concat())
[val, exp].concat()
}
}

Expand All @@ -85,7 +103,6 @@ impl From<TryFromSliceError> for StorageErr {
#[cfg(test)]
mod tests {
use super::ExpiringValue;
use sled::IVec;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

#[test]
Expand All @@ -112,7 +129,7 @@ mod tests {
#[test]
fn updates_when_valid() {
let now = SystemTime::now();
let val = ExpiringValue::new(42, now + Duration::from_secs(1)).update(3, 10);
let val = ExpiringValue::new(42, now + Duration::from_secs(1)).update(3, 10, now);
assert_eq!(val.value_at(now - Duration::from_secs(1)), 45);
}

Expand All @@ -121,16 +138,16 @@ mod tests {
let now = SystemTime::now();
let val = ExpiringValue::new(42, now);
assert_eq!(val.ttl(), Duration::ZERO);
let val = val.update(3, 10);
let val = val.update(3, 10, now);
assert_eq!(val.value_at(now - Duration::from_secs(1)), 3);
}

#[test]
fn from_into_ivec() {
fn from_into_vec() {
let now = SystemTime::now();
let val = ExpiringValue::new(42, now);
let raw: IVec = val.into();
let back: ExpiringValue = raw.as_ref().try_into().unwrap();
let raw: Vec<u8> = val.into();
let back: ExpiringValue = raw.as_slice().try_into().unwrap();

assert_eq!(back.value, 42);
assert_eq!(
Expand Down
8 changes: 4 additions & 4 deletions limitador/src/storage/disk/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use crate::storage::StorageErr;

mod expiring_value;
mod sled_storage;
mod rocksdb_storage;

pub use sled_storage::SledStorage as DiskStorage;
pub use rocksdb_storage::RocksDbStorage as DiskStorage;

impl From<sled::Error> for StorageErr {
fn from(error: sled::Error) -> Self {
impl From<rocksdb::Error> for StorageErr {
fn from(error: rocksdb::Error) -> Self {
Self {
msg: format!("Underlying storage error: {error}"),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,18 @@ use crate::storage::keys::bin::{
key_for_counter, partial_counter_from_counter_key, prefix_for_namespace,
};
use crate::storage::{Authorization, CounterStorage, StorageErr};
use sled::{Config, Db, IVec, Mode};
use rocksdb::{
CompactionDecision, DBCompressionType, DBWithThreadMode, IteratorMode, MultiThreaded, Options,
DB,
};
use std::collections::{BTreeSet, HashSet};
use std::time::{Duration, SystemTime};

pub struct SledStorage {
db: Db,
pub struct RocksDbStorage {
db: DBWithThreadMode<MultiThreaded>,
}

impl CounterStorage for SledStorage {
impl CounterStorage for RocksDbStorage {
fn is_within_limits(&self, counter: &Counter, delta: i64) -> Result<bool, StorageErr> {
let key = key_for_counter(counter);
let value = self.insert_or_update(&key, counter, 0)?;
Expand All @@ -37,10 +40,12 @@ impl CounterStorage for SledStorage {

for counter in &mut *counters {
let key = key_for_counter(counter);
let (val, ttl) = match self.db.get(&key)? {
let slice: &[u8] = key.as_ref();
let (val, ttl) = match self.db.get(slice)? {
None => (0, Duration::from_secs(counter.limit().seconds())),
Some(raw) => {
let value: ExpiringValue = raw.as_ref().try_into()?;
let slice: &[u8] = raw.as_ref();
let value: ExpiringValue = slice.try_into()?;
(value.value(), value.ttl())
}
};
Expand Down Expand Up @@ -70,7 +75,7 @@ impl CounterStorage for SledStorage {
let mut counters = HashSet::default();
let namepaces: BTreeSet<&str> = limits.iter().map(|l| l.namespace().as_ref()).collect();
for ns in namepaces {
for entry in self.db.range(prefix_for_namespace(ns)..) {
for entry in self.db.prefix_iterator(prefix_for_namespace(ns)) {
let (key, value) = entry?;
let mut counter = partial_counter_from_counter_key(key.as_ref());
if counter.namespace().as_ref() != ns {
Expand All @@ -97,28 +102,53 @@ impl CounterStorage for SledStorage {
fn delete_counters(&self, limits: HashSet<Limit>) -> Result<(), StorageErr> {
let counters = self.get_counters(&limits)?;
for counter in &counters {
self.db.remove(key_for_counter(counter))?;
self.db.delete(key_for_counter(counter))?;
}
Ok(())
}

fn clear(&self) -> Result<(), StorageErr> {
Ok(self.db.clear()?)
}
}

impl From<OptimizeFor> for Mode {
fn from(value: OptimizeFor) -> Self {
match value {
OptimizeFor::Space => Mode::LowSpace,
OptimizeFor::Throughput => Mode::HighThroughput,
for entry in self.db.iterator(IteratorMode::Start) {
self.db.delete(entry?.0)?
}
Ok(())
}
}

impl SledStorage {
impl RocksDbStorage {
pub fn open<P: AsRef<std::path::Path>>(path: P, mode: OptimizeFor) -> Result<Self, StorageErr> {
let db = Config::new().mode(mode.into()).path(path).open()?;
let mut opts = Options::default();
match mode {
OptimizeFor::Space => {
opts.set_compression_type(DBCompressionType::Bz2);
opts.set_compaction_filter("ExpiredValueFilter", |_level, _key, value| {
if let Ok(value) = ExpiringValue::try_from(value) {
if value.value_at(SystemTime::now()) != 0 {
return CompactionDecision::Keep;
}
}
CompactionDecision::Remove
});
}
OptimizeFor::Throughput => {
opts.set_compression_type(DBCompressionType::None);
}
}
opts.set_merge_operator_associative("ExpiringValueMerge", |_key, start, operands| {
let now = SystemTime::now();
let mut value: ExpiringValue = start
.map(|raw: &[u8]| raw.try_into().unwrap_or_default())
.unwrap_or_default();
for op in operands {
// ignore (corrupted?) values pending merges
if let Ok(pending) = ExpiringValue::try_from(op) {
value = value.merge(pending, now);
}
}
Some(Vec::from(value))
});
opts.create_if_missing(true);
let db = DB::open(&opts, path).unwrap();
Ok(Self { db })
}

Expand All @@ -128,38 +158,28 @@ impl SledStorage {
counter: &Counter,
delta: i64,
) -> Result<ExpiringValue, StorageErr> {
Ok(self.db.update_and_fetch(key, |prev| {
let updated_value = match prev {
Some(raw) => {
let value: ExpiringValue = match TryInto::<ExpiringValue>::try_into(raw) {
Ok(val) => val.update(delta, counter.seconds()),
Err(_) => ExpiringValue::new(
delta,
SystemTime::now() + Duration::from_secs(counter.limit().seconds()),
),
};
value
}
None => ExpiringValue::new(
delta,
SystemTime::now() + Duration::from_secs(counter.limit().seconds()),
),
};
Some::<IVec>(updated_value.into())
})?)
.map(|option| {
option
.expect("we always have a counter now!")
.as_ref()
.try_into()
.expect("This has to work!")
})
let now = SystemTime::now();
let value = match self.db.get(key)? {
None => ExpiringValue::default(),
Some(raw) => {
let slice: &[u8] = raw.as_ref();
slice.try_into()?
}
};
if value.value_at(now) + delta <= counter.max_value() {
let expiring_value =
ExpiringValue::new(delta, now + Duration::from_secs(counter.limit().seconds()));
self.db
.merge(key, <ExpiringValue as Into<Vec<u8>>>::into(expiring_value))?;
return Ok(value.update(delta, counter.seconds(), now));
}
Ok(value)
}
}

#[cfg(test)]
mod tests {
use super::SledStorage;
use super::RocksDbStorage;
use crate::counter::Counter;
use crate::limit::Limit;
use crate::storage::disk::OptimizeFor;
Expand All @@ -177,7 +197,7 @@ mod tests {

let tmp = TempDir::new("limitador-disk-tests").expect("We should have a dir!");
{
let storage = SledStorage::open(tmp.path(), OptimizeFor::Space)
let storage = RocksDbStorage::open(tmp.path(), OptimizeFor::Space)
.expect("We should have a storage");
let mut files = fs::read_dir(tmp.as_ref()).expect("Couldn't access data dir");
assert!(files.next().is_some());
Expand All @@ -202,7 +222,7 @@ mod tests {
}

{
let storage = SledStorage::open(tmp.path(), OptimizeFor::Space)
let storage = RocksDbStorage::open(tmp.path(), OptimizeFor::Space)
.expect("We should still have a storage");
assert!(
!storage.is_within_limits(&counter, 1).unwrap(),
Expand Down
1 change: 1 addition & 0 deletions limitador/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::collections::{HashMap, HashSet};
use std::sync::RwLock;
use thiserror::Error;

#[cfg(feature = "disk_storage")]
pub mod disk;
pub mod in_memory;
pub mod wasm;
Expand Down
2 changes: 1 addition & 1 deletion limitador/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ macro_rules! test_with_all_storage_impls {
}

#[tokio::test]
async fn [<$function _sled_storage>]() {
async fn [<$function _disk_storage>]() {
let dir = TempDir::new("limitador-disk-integration-tests").expect("We should have a dir!");
let rate_limiter =
RateLimiter::new_with_storage(Box::new(DiskStorage::open(dir.path(), OptimizeFor::Throughput).expect("Couldn't open temp dir")));
Expand Down