Skip to content

Commit

Permalink
Merge pull request #267 from adam-cattermole/tracing-metrics-layer
Browse files Browse the repository at this point in the history
  • Loading branch information
adam-cattermole authored Apr 4, 2024
2 parents cce84e4 + 0cf8bd3 commit 22f65b0
Show file tree
Hide file tree
Showing 13 changed files with 430 additions and 326 deletions.
36 changes: 9 additions & 27 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions limitador-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ lazy_static = "1.4.0"
clap = "4.3"
sysinfo = "0.29.7"
openssl = { version = "0.10.57", features = ["vendored"] }
prometheus = "0.13.3"


[build-dependencies]
Expand Down
5 changes: 4 additions & 1 deletion limitador-server/src/envoy_rls/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::envoy_rls::server::envoy::service::ratelimit::v3::rate_limit_service_
use crate::envoy_rls::server::envoy::service::ratelimit::v3::{
RateLimitRequest, RateLimitResponse,
};
use crate::Limiter;
use crate::{Limiter, PROMETHEUS_METRICS};

include!("envoy_types.rs");

Expand Down Expand Up @@ -124,8 +124,11 @@ impl RateLimitService for MyRateLimiter {

let mut rate_limited_resp = rate_limited_resp.unwrap();
let resp_code = if rate_limited_resp.limited {
PROMETHEUS_METRICS
.incr_limited_calls(&namespace, rate_limited_resp.limit_name.as_deref());
Code::OverLimit
} else {
PROMETHEUS_METRICS.incr_authorized_calls(&namespace);
Code::Ok
};

Expand Down
14 changes: 7 additions & 7 deletions limitador-server/src/http_api/server.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::http_api::request_types::{CheckAndReportInfo, Counter, Limit};
use crate::Limiter;
use crate::{Limiter, PROMETHEUS_METRICS};
use actix_web::{http::StatusCode, ResponseError};
use actix_web::{App, HttpServer};
use paperclip::actix::{
Expand Down Expand Up @@ -44,13 +44,10 @@ async fn status() -> web::Json<()> {
Json(())
}

#[tracing::instrument(skip(data))]
#[tracing::instrument(skip(_data))]
#[api_v2_operation]
async fn metrics(data: web::Data<Arc<Limiter>>) -> String {
match data.get_ref().as_ref() {
Limiter::Blocking(limiter) => limiter.gather_prometheus_metrics(),
Limiter::Async(limiter) => limiter.gather_prometheus_metrics(),
}
async fn metrics(_data: web::Data<Arc<Limiter>>) -> String {
PROMETHEUS_METRICS.gather_metrics()
}

#[api_v2_operation]
Expand Down Expand Up @@ -170,8 +167,11 @@ async fn check_and_report(
match rate_limited_and_update_result {
Ok(is_rate_limited) => {
if is_rate_limited.limited {
PROMETHEUS_METRICS
.incr_limited_calls(&namespace, is_rate_limited.limit_name.as_deref());
Err(ErrorResponse::TooManyRequests)
} else {
PROMETHEUS_METRICS.incr_authorized_calls(&namespace);
Ok(Json(()))
}
}
Expand Down
68 changes: 31 additions & 37 deletions limitador-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ use crate::config::{
};
use crate::envoy_rls::server::{run_envoy_rls_server, RateLimitHeaders};
use crate::http_api::server::run_http_server;
use crate::metrics::MetricsLayer;
use clap::{value_parser, Arg, ArgAction, Command};
use const_format::formatcp;
use lazy_static::lazy_static;
use limitador::counter::Counter;
use limitador::errors::LimitadorError;
use limitador::limit::Limit;
Expand All @@ -36,6 +38,7 @@ use opentelemetry::{global, KeyValue};
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::propagation::TraceContextPropagator;
use opentelemetry_sdk::{trace, Resource};
use prometheus_metrics::PrometheusMetrics;
use std::env::VarError;
use std::fmt::Display;
use std::fs;
Expand All @@ -55,6 +58,8 @@ mod envoy_rls;
mod http_api;

mod config;
mod metrics;
pub mod prometheus_metrics;

const LIMITADOR_VERSION: &str = env!("CARGO_PKG_VERSION");
const LIMITADOR_PROFILE: &str = env!("LIMITADOR_PROFILE");
Expand All @@ -71,6 +76,10 @@ pub enum LimitadorServerError {
Internal(LimitadorError),
}

lazy_static! {
pub static ref PROMETHEUS_METRICS: PrometheusMetrics = PrometheusMetrics::default();
}

pub enum Limiter {
Blocking(RateLimiter),
Async(AsyncRateLimiter),
Expand All @@ -85,29 +94,19 @@ impl From<LimitadorError> for LimitadorServerError {
impl Limiter {
pub async fn new(config: Configuration) -> Result<Self, LimitadorServerError> {
let rate_limiter = match config.storage {
StorageConfiguration::Redis(cfg) => {
Self::redis_limiter(cfg, config.limit_name_in_labels).await
}
StorageConfiguration::Redis(cfg) => Self::redis_limiter(cfg).await,
#[cfg(feature = "infinispan")]
StorageConfiguration::Infinispan(cfg) => {
Self::infinispan_limiter(cfg, config.limit_name_in_labels).await
}
StorageConfiguration::InMemory(cfg) => {
Self::in_memory_limiter(cfg, config.limit_name_in_labels)
}
StorageConfiguration::Disk(cfg) => Self::disk_limiter(cfg, config.limit_name_in_labels),
StorageConfiguration::Infinispan(cfg) => Self::infinispan_limiter(cfg).await,
StorageConfiguration::InMemory(cfg) => Self::in_memory_limiter(cfg),
StorageConfiguration::Disk(cfg) => Self::disk_limiter(cfg),
};

Ok(rate_limiter)
}

async fn redis_limiter(cfg: RedisStorageConfiguration, limit_name_labels: bool) -> Self {
async fn redis_limiter(cfg: RedisStorageConfiguration) -> Self {
let storage = Self::storage_using_redis(cfg).await;
let mut rate_limiter_builder = AsyncRateLimiterBuilder::new(storage);

if limit_name_labels {
rate_limiter_builder = rate_limiter_builder.with_prometheus_limit_name_labels()
}
let rate_limiter_builder = AsyncRateLimiterBuilder::new(storage);

Self::Async(rate_limiter_builder.build())
}
Expand Down Expand Up @@ -152,10 +151,7 @@ impl Limiter {
}

#[cfg(feature = "infinispan")]
async fn infinispan_limiter(
cfg: InfinispanStorageConfiguration,
limit_name_labels: bool,
) -> Self {
async fn infinispan_limiter(cfg: InfinispanStorageConfiguration) -> Self {
use url::Url;

let parsed_url = Url::parse(&cfg.url).unwrap();
Expand Down Expand Up @@ -191,42 +187,30 @@ impl Limiter {
None => builder.build().await,
};

let mut rate_limiter_builder =
let rate_limiter_builder =
AsyncRateLimiterBuilder::new(AsyncStorage::with_counter_storage(Box::new(storage)));

if limit_name_labels {
rate_limiter_builder = rate_limiter_builder.with_prometheus_limit_name_labels()
}

Self::Async(rate_limiter_builder.build())
}

fn disk_limiter(cfg: DiskStorageConfiguration, limit_name_in_labels: bool) -> Self {
fn disk_limiter(cfg: DiskStorageConfiguration) -> Self {
let storage = match DiskStorage::open(cfg.path.as_str(), cfg.optimization) {
Ok(storage) => storage,
Err(err) => {
eprintln!("Failed to open DB at {}: {err}", cfg.path);
process::exit(1)
}
};
let mut rate_limiter_builder =
let rate_limiter_builder =
RateLimiterBuilder::with_storage(Storage::with_counter_storage(Box::new(storage)));

if limit_name_in_labels {
rate_limiter_builder = rate_limiter_builder.with_prometheus_limit_name_labels()
}

Self::Blocking(rate_limiter_builder.build())
}

fn in_memory_limiter(cfg: InMemoryStorageConfiguration, limit_name_in_labels: bool) -> Self {
let mut rate_limiter_builder =
fn in_memory_limiter(cfg: InMemoryStorageConfiguration) -> Self {
let rate_limiter_builder =
RateLimiterBuilder::new(cfg.cache_size.or_else(guess_cache_size).unwrap());

if limit_name_in_labels {
rate_limiter_builder = rate_limiter_builder.with_prometheus_limit_name_labels()
}

Self::Blocking(rate_limiter_builder.build())
}

Expand Down Expand Up @@ -292,6 +276,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::layer()
};

let metrics_layer = MetricsLayer::new().gather(
"should_rate_limit",
|timings| PROMETHEUS_METRICS.counter_access(Duration::from(timings)),
vec!["datastore"],
);

if !config.tracing_endpoint.is_empty() {
global::set_text_map_propagator(TraceContextPropagator::new());
let tracer = opentelemetry_otlp::new_pipeline()
Expand All @@ -308,16 +298,20 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let telemetry_layer = tracing_opentelemetry::layer().with_tracer(tracer);
tracing_subscriber::registry()
.with(level)
.with(metrics_layer)
.with(fmt_layer)
.with(telemetry_layer)
.init();
} else {
tracing_subscriber::registry()
.with(level)
.with(metrics_layer)
.with(fmt_layer)
.init();
};

PROMETHEUS_METRICS.set_use_limit_name_in_label(config.limit_name_in_labels);

info!("Version: {}", version);
info!("Using config: {:?}", config);
config
Expand Down
Loading

0 comments on commit 22f65b0

Please sign in to comment.