Skip to content

Commit

Permalink
feat: support purge http cache
Browse files Browse the repository at this point in the history
  • Loading branch information
vicanso committed Sep 7, 2024
1 parent 249feab commit 0e0ffa7
Show file tree
Hide file tree
Showing 11 changed files with 184 additions and 87 deletions.
46 changes: 23 additions & 23 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
- [ ] log rotate
- [ ] secret storage
- [ ] support include comnand for configuraion
- [ ] accept encoding adjustment plugin
- [x] support purge http cache
- [x] support docker service discovery
- [x] upstream and location update notification
- [x] support validate config for plugin
Expand Down
1 change: 0 additions & 1 deletion src/logger/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use std::path::Path;
use std::sync::Mutex;
use tracing::{info, Level};
use tracing_subscriber::fmt::writer::BoxMakeWriter;

use crate::util;

#[derive(Default, Debug)]
Expand Down
83 changes: 63 additions & 20 deletions src/plugin/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@ use crate::config::{
};
use crate::http_extra::HttpResponse;
use crate::state::State;
use crate::util;
use async_trait::async_trait;
use bytes::{BufMut, BytesMut};
use bytes::{BufMut, Bytes, BytesMut};
use bytesize::ByteSize;
use http::Method;
use http::{Method, StatusCode};
use humantime::parse_duration;
use once_cell::sync::OnceCell;
use once_cell::sync::{Lazy, OnceCell};
use pingora::cache::eviction::simple_lru::Manager;
use pingora::cache::eviction::EvictionManager;
use pingora::cache::key::CacheHashKey;
use pingora::cache::lock::CacheLock;
use pingora::cache::predictor::{CacheablePredictor, Predictor};
use pingora::proxy::Session;
Expand All @@ -54,6 +56,7 @@ pub struct Cache {
max_ttl: Option<Duration>,
namespace: Option<String>,
headers: Option<Vec<String>>,
purge_ip_rules: util::IpRules,
hash_value: String,
}

Expand Down Expand Up @@ -177,6 +180,9 @@ impl TryFrom<&PluginConf> for Cache {
None
};

let purge_ip_rules =
util::IpRules::new(&get_str_slice_conf(value, "purge_ip_list"));

let params = Self {
hash_value,
http_cache: cache,
Expand All @@ -188,6 +194,7 @@ impl TryFrom<&PluginConf> for Cache {
max_file_size: max_file_size.as_u64() as usize,
namespace,
headers,
purge_ip_rules,
};
if params.plugin_step != PluginStep::Request {
return Err(Error::Invalid {
Expand All @@ -207,6 +214,9 @@ impl Cache {
}
}

static METHOD_PURGE: Lazy<Method> =
Lazy::new(|| Method::from_bytes(b"PURGE").unwrap());

#[async_trait]
impl Plugin for Cache {
#[inline]
Expand All @@ -224,26 +234,13 @@ impl Plugin for Cache {
return Ok(None);
}
// cache only support get or head
if ![Method::GET, Method::HEAD].contains(&session.req_header().method) {
let method = &session.req_header().method;
if ![Method::GET, Method::HEAD, METHOD_PURGE.to_owned()]
.contains(method)
{
return Ok(None);
}
// max age of cache control
ctx.cache_max_ttl = self.max_ttl;

session.cache.enable(
self.http_cache,
self.eviction,
self.predictor,
self.lock,
);
// set max size of cache response body
if self.max_file_size > 0 {
session.cache.set_max_file_size_bytes(self.max_file_size);
}
if let Some(stats) = self.http_cache.stats() {
ctx.cache_reading = Some(stats.reading);
ctx.cache_writing = Some(stats.writing);
}
let mut keys = BytesMut::with_capacity(64);
if let Some(namespace) = &self.namespace {
keys.put(namespace.as_bytes());
Expand All @@ -264,6 +261,52 @@ impl Plugin for Cache {
debug!("Cache prefix: {prefix}");
ctx.cache_prefix = Some(prefix);
}
if method == METHOD_PURGE.to_owned() {
let found = match self
.purge_ip_rules
.matched(&util::get_client_ip(session))
{
Ok(matched) => matched,
Err(e) => {
return Ok(Some(HttpResponse::bad_request(
e.to_string().into(),
)));
},
};
if !found {
return Ok(Some(HttpResponse {
status: StatusCode::FORBIDDEN,
body: Bytes::from_static(b"Forbidden, ip is not allowed"),
..Default::default()
}));
}

let key = util::get_cache_key(
&ctx.cache_prefix.clone().unwrap_or_default(),
Method::GET.as_ref(),
&session.req_header().uri,
);
self.http_cache.cached.remove(&key.combined()).await?;
return Ok(Some(HttpResponse::no_content()));
}

// max age of cache control
ctx.cache_max_ttl = self.max_ttl;

session.cache.enable(
self.http_cache,
self.eviction,
self.predictor,
self.lock,
);
// set max size of cache response body
if self.max_file_size > 0 {
session.cache.set_max_file_size_bytes(self.max_file_size);
}
if let Some(stats) = self.http_cache.stats() {
ctx.cache_reading = Some(stats.reading);
ctx.cache_writing = Some(stats.writing);
}

Ok(None)
}
Expand Down
51 changes: 14 additions & 37 deletions src/plugin/ip_restriction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,12 @@ use crate::util;
use async_trait::async_trait;
use bytes::Bytes;
use http::StatusCode;
use ipnet::IpNet;
use pingora::proxy::Session;
use std::net::IpAddr;
use std::str::FromStr;
use tracing::debug;

pub struct IpRestriction {
plugin_step: PluginStep,
ip_net_list: Vec<IpNet>,
ip_list: Vec<String>,
ip_rules: util::IpRules,
restriction_category: String,
forbidden_resp: HttpResponse,
hash_value: String,
Expand All @@ -44,24 +40,16 @@ impl TryFrom<&PluginConf> for IpRestriction {
let hash_value = get_hash_key(value);
let step = get_step_conf(value);

let mut ip_net_list = vec![];
let mut ip_list = vec![];
for item in get_str_slice_conf(value, "ip_list") {
if let Ok(value) = IpNet::from_str(&item) {
ip_net_list.push(value);
} else {
ip_list.push(item);
}
}
let ip_rules =
util::IpRules::new(&get_str_slice_conf(value, "ip_list"));
let mut message = get_str_conf(value, "message");
if message.is_empty() {
message = "Request is forbidden".to_string();
}
let params = Self {
hash_value,
plugin_step: step,
ip_list,
ip_net_list,
ip_rules,
restriction_category: get_str_conf(value, "type"),
forbidden_resp: HttpResponse {
status: StatusCode::FORBIDDEN,
Expand Down Expand Up @@ -112,20 +100,15 @@ impl Plugin for IpRestriction {
ip
};

let found = if self.ip_list.contains(&ip) {
true
} else {
match ip.parse::<IpAddr>() {
Ok(addr) => {
self.ip_net_list.iter().any(|item| item.contains(&addr))
},
Err(e) => {
return Ok(Some(HttpResponse::bad_request(
e.to_string().into(),
)));
},
}
let found = match self.ip_rules.matched(&ip) {
Ok(matched) => matched,
Err(e) => {
return Ok(Some(HttpResponse::bad_request(
e.to_string().into(),
)));
},
};

// deny ip
let allow = if self.restriction_category == "deny" {
!found
Expand Down Expand Up @@ -167,15 +150,9 @@ type = "deny"
)
.unwrap();
assert_eq!("request", params.plugin_step.to_string());
assert_eq!("192.168.1.1,10.1.1.1", params.ip_list.join(","));
assert_eq!(
"1.1.1.0/24,2.1.1.0/24",
params
.ip_net_list
.iter()
.map(|item| item.to_string())
.collect::<Vec<_>>()
.join(",")
r#"IpRules { ip_net_list: [1.1.1.0/24, 2.1.1.0/24], ip_list: ["192.168.1.1", "10.1.1.1"] }"#,
format!("{:?}", params.ip_rules)
);

let result = IpRestriction::try_from(
Expand Down
Loading

0 comments on commit 0e0ffa7

Please sign in to comment.