Skip to content

Commit

Permalink
feat: add support for getting DraftVersion03 response headers on the …
Browse files Browse the repository at this point in the history
…/check_and_report operation.

Signed-off-by: Hiram Chirino <hiram@hiramchirino.com>
  • Loading branch information
chirino committed May 1, 2024
1 parent 31ee24c commit b2695ae
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 13 deletions.
4 changes: 4 additions & 0 deletions limitador-server/docs/http_server_spec.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
"additionalProperties": {
"type": "string"
}
},
"response_headers": {
"type": "string",
"enum": ["none", "DraftVersion03"]
}
},
"required": [
Expand Down
1 change: 1 addition & 0 deletions limitador-server/src/http_api/request_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub struct CheckAndReportInfo {
pub namespace: String,
pub values: HashMap<String, String>,
pub delta: i64,
pub response_headers: Option<String>,
}

#[derive(Debug, Eq, PartialEq, Serialize, Deserialize, Apiv2Schema)]
Expand Down
197 changes: 184 additions & 13 deletions limitador-server/src/http_api/server.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::http_api::request_types::{CheckAndReportInfo, Counter, Limit};
use crate::prometheus_metrics::PrometheusMetrics;
use crate::Limiter;
use actix_web::{http::StatusCode, ResponseError};
use actix_web::{http::StatusCode, HttpResponse, HttpResponseBuilder, ResponseError};
use actix_web::{App, HttpServer};
use paperclip::actix::{
api_v2_errors,
Expand All @@ -11,6 +11,7 @@ use paperclip::actix::{
// extension trait for actix_web::App and proc-macro attributes
OpenApiExt,
};
use std::cmp::Ordering;
use std::fmt;
use std::sync::Arc;

Expand Down Expand Up @@ -118,6 +119,7 @@ async fn check(
namespace,
values,
delta,
response_headers: _,
} = request.into_inner();
let namespace = namespace.into();
let is_rate_limited_result = match state.get_ref().limiter() {
Expand Down Expand Up @@ -147,6 +149,7 @@ async fn report(
namespace,
values,
delta,
response_headers: _,
} = request.into_inner();
let namespace = namespace.into();
let update_counters_result = match data.get_ref().limiter() {
Expand All @@ -165,41 +168,128 @@ async fn report(
async fn check_and_report(
data: web::Data<RateLimitData>,
request: web::Json<CheckAndReportInfo>,
) -> Result<web::Json<()>, ErrorResponse> {
) -> HttpResponse {
let CheckAndReportInfo {
namespace,
values,
delta,
response_headers,
} = request.into_inner();
let namespace = namespace.into();
let rate_limit_data = data.get_ref();
let rate_limited_and_update_result = match rate_limit_data.limiter() {
Limiter::Blocking(limiter) => {
limiter.check_rate_limited_and_update(&namespace, &values, delta, false)
}
Limiter::Blocking(limiter) => limiter.check_rate_limited_and_update(
&namespace,
&values,
delta,
response_headers.is_some(),
),
Limiter::Async(limiter) => {
limiter
.check_rate_limited_and_update(&namespace, &values, delta, false)
.check_rate_limited_and_update(
&namespace,
&values,
delta,
response_headers.is_some(),
)
.await
}
};

match rate_limited_and_update_result {
Ok(is_rate_limited) => {
Ok(mut is_rate_limited) => {
if is_rate_limited.limited {
rate_limit_data
.metrics()
.incr_limited_calls(&namespace, is_rate_limited.limit_name.as_deref());
Err(ErrorResponse::TooManyRequests)

match response_headers {
None => HttpResponse::TooManyRequests().json(()),
Some(response_headers) => {
let mut resp = HttpResponse::TooManyRequests();
add_response_header(
&mut resp,
response_headers.as_str(),
&mut is_rate_limited.counters,
);
resp.json(())
}
}
} else {
rate_limit_data.metrics().incr_authorized_calls(&namespace);
Ok(Json(()))

match response_headers {
None => HttpResponse::Ok().json(()),
Some(response_headers) => {
let mut resp = HttpResponse::Ok();
add_response_header(
&mut resp,
response_headers.as_str(),
&mut is_rate_limited.counters,
);
resp.json(())
}
}
}
}
Err(_) => Err(ErrorResponse::InternalServerError),
Err(_) => HttpResponse::InternalServerError().json(()),
}
}

pub fn add_response_header(
resp: &mut HttpResponseBuilder,
rate_limit_headers: &str,
counters: &mut [limitador::counter::Counter],
) {
match rate_limit_headers {
// creates response headers per https://datatracker.ietf.org/doc/id/draft-polli-ratelimit-headers-03.html
"DraftVersion03" => {
// sort by the limit remaining..
counters.sort_by(|a, b| {
let a_remaining = a.remaining().unwrap_or(a.max_value());
let b_remaining = b.remaining().unwrap_or(b.max_value());
if a_remaining - b_remaining < 0 {
Ordering::Less
} else {
Ordering::Greater
}
});

let mut all_limits_text = String::with_capacity(20 * counters.len());
counters.iter_mut().for_each(|counter| {
all_limits_text.push_str(
format!(", {};w={}", counter.max_value(), counter.seconds()).as_str(),
);
if let Some(name) = counter.limit().name() {
all_limits_text
.push_str(format!(";name=\"{}\"", name.replace('"', "'")).as_str());
}
});

if let Some(counter) = counters.first() {
resp.insert_header((
"X-RateLimit-Limit",
format!("{}{}", counter.max_value(), all_limits_text),
));

let mut remaining = counter.remaining().unwrap_or(counter.max_value());
if remaining < 0 {
remaining = 0
}
resp.insert_header((
"X-RateLimit-Remaining".to_string(),
format!("{}", remaining),
));

if let Some(duration) = counter.expires_in() {
resp.insert_header(("X-RateLimit-Reset", format!("{}", duration.as_secs())));
}
}
}
_default => {}
};
}

pub async fn run_http_server(
address: &str,
rate_limiter: Arc<Limiter>,
Expand Down Expand Up @@ -231,11 +321,15 @@ pub async fn run_http_server(

#[cfg(test)]
mod tests {
use super::*;
use crate::Configuration;
use std::collections::HashMap;

use actix_web::{test, web};

use limitador::limit::Limit as LimitadorLimit;
use std::collections::HashMap;

use crate::Configuration;

use super::*;

// All these tests use the in-memory storage implementation to simplify. We
// know that some storage implementations like the Redis one trade
Expand Down Expand Up @@ -327,6 +421,7 @@ mod tests {
namespace: namespace.into(),
values,
delta: 1,
response_headers: None,
};

// The first request should be OK
Expand All @@ -337,6 +432,8 @@ mod tests {
.to_request();
let resp = test::call_service(&app, req).await;
assert!(resp.status().is_success());
assert_eq!(resp.headers().get("X-RateLimit-Limit"), None);
assert_eq!(resp.headers().get("X-RateLimit-Remaining"), None);

// The second request should be rate-limited
let req = test::TestRequest::post()
Expand All @@ -348,6 +445,79 @@ mod tests {
assert_eq!(resp.status(), StatusCode::TOO_MANY_REQUESTS);
}

#[actix_rt::test]
async fn test_check_and_report_with_draftversion03_response_headers() {
let limiter = Limiter::new(Configuration::default()).await.unwrap();

// Create a limit with max == 1
let namespace = "test_namespace";
let _limit = create_test_limit(&limiter, namespace, 2).await;
let rate_limiter: Arc<Limiter> = Arc::new(limiter);
let prometheus_metrics: Arc<PrometheusMetrics> = Arc::new(PrometheusMetrics::default());
let data = web::Data::new(RateLimitData::new(rate_limiter, prometheus_metrics));
let app = test::init_service(
App::new()
.app_data(data.clone())
.route("/check_and_report", web::post().to(check_and_report)),
)
.await;

// Prepare values to check
let mut values = HashMap::new();
values.insert("req.method".into(), "GET".into());
values.insert("app_id".into(), "1".into());
let info = CheckAndReportInfo {
namespace: namespace.into(),
values,
delta: 1,
response_headers: Some("DraftVersion03".to_string()),
};

// The first request should be OK
let req = test::TestRequest::post()
.uri("/check_and_report")
.data(data.clone())
.set_json(&info)
.to_request();
let resp = test::call_service(&app, req).await;

assert!(resp.status().is_success());
assert_eq!(
resp.headers().get("X-RateLimit-Limit").unwrap(),
"2, 2;w=60"
);
assert_eq!(resp.headers().get("X-RateLimit-Remaining").unwrap(), "1");

// The 2nd request should be OK
let req = test::TestRequest::post()
.uri("/check_and_report")
.data(data.clone())
.set_json(&info)
.to_request();
let resp = test::call_service(&app, req).await;

assert!(resp.status().is_success());
assert_eq!(
resp.headers().get("X-RateLimit-Limit").unwrap(),
"2, 2;w=60"
);
assert_eq!(resp.headers().get("X-RateLimit-Remaining").unwrap(), "0");

// The 3rd request should be rate-limited
let req = test::TestRequest::post()
.uri("/check_and_report")
.data(data.clone())
.set_json(&info)
.to_request();
let resp = test::call_service(&app, req).await;
assert_eq!(resp.status(), StatusCode::TOO_MANY_REQUESTS);
assert_eq!(
resp.headers().get("X-RateLimit-Limit").unwrap(),
"2, 2;w=60"
);
assert_eq!(resp.headers().get("X-RateLimit-Remaining").unwrap(), "0");
}

#[actix_rt::test]
async fn test_check_and_report_endpoints_separately() {
let namespace = "test_namespace";
Expand All @@ -373,6 +543,7 @@ mod tests {
namespace: namespace.into(),
values,
delta: 1,
response_headers: None,
};

// Without making any requests, check should return OK
Expand Down

0 comments on commit b2695ae

Please sign in to comment.