Skip to content

Commit

Permalink
Merge pull request #438 from splitgraph/experimental-timing-data
Browse files Browse the repository at this point in the history
Return timing data in X-Seafowl-Query-Time (milliseconds)
  • Loading branch information
onpaws authored Jun 23, 2023
2 parents b616465 + 1071e86 commit 72ab83f
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 6 deletions.
35 changes: 29 additions & 6 deletions src/frontend/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use datafusion::error::DataFusionError;

use std::fmt::Debug;
use std::io::Write;
use std::time::Instant;
use std::{net::SocketAddr, sync::Arc};
use warp::{hyper, Rejection};

Expand Down Expand Up @@ -34,6 +35,7 @@ use warp::multipart::{FormData, Part};
use warp::reply::{with_header, Response};
use warp::{hyper::header, hyper::StatusCode, Filter, Reply};

use super::http_utils::{handle_rejection, into_response, ApiError};
use crate::auth::{token_to_principal, AccessPolicy, Action, UserContext};
use crate::catalog::DEFAULT_DB;
use crate::config::schema::{AccessSettings, MEBIBYTES};
Expand All @@ -44,13 +46,12 @@ use crate::{
},
};

use super::http_utils::{handle_rejection, into_response, ApiError};

const QUERY_HEADER: &str = "X-Seafowl-Query";
const BEARER_PREFIX: &str = "Bearer ";
// We have a very lax CORS on this, so we don't mind browsers
// caching it for as long as possible.
const CORS_MAXAGE: u32 = 86400;
const QUERY_TIME_HEADER: &str = "X-Seafowl-Query-Time";

// Vary on Origin, as warp's CORS responds with Access-Control-Allow-Origin: [origin],
// so we can't cache the response in the browser if the origin changes.
Expand Down Expand Up @@ -155,6 +156,8 @@ pub async fn uncached_read_write_query(
query: String,
mut context: Arc<DefaultSeafowlContext>,
) -> Result<Response, ApiError> {
let timer = Instant::now();

// If a specific DB name was used as a parameter in the route, scope the context to it,
// effectively making it the default DB for the duration of the session.
if database_name != context.database {
Expand Down Expand Up @@ -215,6 +218,12 @@ pub async fn uncached_read_write_query(
.headers_mut()
.insert(header::CONTENT_TYPE, content_type_with_schema(schema));
}

let elapsed = timer.elapsed().as_millis().to_string();
response
.headers_mut()
.insert(QUERY_TIME_HEADER, elapsed.parse().unwrap());

Ok(response)
}

Expand Down Expand Up @@ -285,6 +294,8 @@ pub async fn cached_read_query(
if_none_match: Option<String>,
mut context: Arc<DefaultSeafowlContext>,
) -> Result<Response, ApiError> {
let timer = Instant::now();

// Ignore dots at the end
let query_or_hash = query_or_hash.split('.').next().unwrap();

Expand Down Expand Up @@ -346,6 +357,10 @@ pub async fn cached_read_query(
let schema = physical.schema().clone();
let mut response = plan_to_response(context, physical).await?;

let elapsed = timer.elapsed().as_millis().to_string();
response
.headers_mut()
.insert(QUERY_TIME_HEADER, elapsed.parse().unwrap());
response
.headers_mut()
.insert(header::ETAG, etag.parse().unwrap());
Expand Down Expand Up @@ -477,7 +492,6 @@ pub fn filters(
.max_age(CORS_MAXAGE);

let log = warp::log(module_path!());

// Cached read query
let ctx = context.clone();
let cached_read_query_route = warp::path!(String / "q" / String)
Expand Down Expand Up @@ -605,10 +619,10 @@ pub mod tests {

use crate::catalog::DEFAULT_DB;
use crate::config::schema::{str_to_hex_hash, HttpFrontend};
use crate::testutils::schema_from_header;
use crate::testutils::{assert_header_is_float, schema_from_header};
use crate::{
context::{test_utils::in_memory_context, DefaultSeafowlContext, SeafowlContext},
frontend::http::{filters, QUERY_HEADER},
frontend::http::{filters, QUERY_HEADER, QUERY_TIME_HEADER},
};

fn http_config_from_access_policy_and_cache_control(
Expand Down Expand Up @@ -1528,7 +1542,10 @@ pub mod tests {
)]
#[case::uncached_post("POST", "/q")]
#[tokio::test]
async fn test_http_type_conversion(#[case] method: &str, #[case] path: &str) {
async fn test_http_type_conversion_and_timing_header(
#[case] method: &str,
#[case] path: &str,
) {
let context = Arc::new(in_memory_context().await);
let handler = filters(
context.clone(),
Expand Down Expand Up @@ -1582,5 +1599,11 @@ SELECT
"#
)
);

// Assert the "request-to-response" time header is present
assert!(resp.headers().contains_key(QUERY_TIME_HEADER));
// Assert that it's a float
let header_value = resp.headers().get(QUERY_TIME_HEADER).unwrap();
assert_header_is_float(header_value);
}
}
7 changes: 7 additions & 0 deletions src/testutils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::cmp::min;
use std::str::FromStr;
use std::sync::Arc;

use arrow::array::Int32Array;
Expand Down Expand Up @@ -161,3 +162,9 @@ pub fn schema_from_header(headers: &HeaderMap<HeaderValue>) -> Schema {

schema_from_json(&schema_json).expect("arrow schema reconstructable from JSON")
}

pub fn assert_header_is_float(header: &HeaderValue) -> bool {
let float_str = header.to_str().unwrap();
let parsed_float = f64::from_str(float_str).unwrap();
parsed_float.is_finite()
}

0 comments on commit 72ab83f

Please sign in to comment.