Skip to content

Commit

Permalink
Add integral function for time_weight
Browse files Browse the repository at this point in the history
Implements #455
  • Loading branch information
syvb committed Sep 8, 2022
1 parent 2eb84f0 commit 6882a6f
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 6 deletions.
10 changes: 10 additions & 0 deletions crates/time-weighted-average/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,16 @@ impl TimeWeightSummary {
let duration = (self.last.ts - self.first.ts) as f64;
Ok(self.w_sum / duration)
}

/// Evaluate the integral in microseconds.
pub fn time_weighted_integral(&self) -> f64 {
if self.last.ts == self.first.ts {
// the integral of a duration of zero width is zero
0.0
} else {
self.w_sum
}
}
}

impl TimeWeightMethod {
Expand Down
27 changes: 26 additions & 1 deletion extension/src/accessors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ accessor! { last_val() }
accessor! { first_time() }
accessor! { last_time() }


// The rest are more complex, with String or other challenges. Leaving alone for now.

pg_type! {
Expand Down Expand Up @@ -565,3 +564,29 @@ pub fn accessor_unnest(
}
}
}

pg_type! {
#[derive(Debug)]
struct AccessorIntegral<'input> {
len: u32,
bytes: [u8; self.len],
}
}

// FIXME string IO
ron_inout_funcs!(AccessorIntegral);

#[pg_extern(immutable, parallel_safe, name="integral")]
pub fn accessor_integral(
unit: default!(&str, "'second'"),
) -> AccessorIntegral<'static> {
unsafe {
flatten!{
AccessorIntegral {
len: unit.len().try_into().unwrap(),
bytes: unit.as_bytes().into(),
}
}
}
}

67 changes: 67 additions & 0 deletions extension/src/duration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
//! Utilities for working with durations. Parsing of duration units is intended to match how
//! PostgreSQL parses duration units. Currently units longer than an hour are unsupported since
//! the length of days varies when in a timezone with daylight savings time.

// Canonical PostgreSQL units: https://github.com/postgres/postgres/blob/b76fb6c2a99eb7d49f96e56599fef1ffc1c134c9/src/include/utils/datetime.h#L48-L60
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum DurationUnit {
// units should be ordered smallest -> largest
Microsec,
Millisec,
Second,
Minute,
Hour,
}

impl DurationUnit {
fn microseconds(self) -> u32 {
match self {
Self::Microsec => 1,
Self::Millisec => 1000,
Self::Second => 1_000_000,
Self::Minute => 60_000_000,
Self::Hour => 3_600_000_000,
}
}

/// Convert `amount` of a unit to another unit.
pub fn convert_unit(self, amount: f64, to: Self) -> f64 {
let microseconds = amount * (self.microseconds() as f64);
microseconds / (to.microseconds() as f64)
}

/// Tries to get a duration unit from a string, returning `None` if no known unit matched.
pub fn from_str(s: &str) -> Option<Self> {
// Aliases for canonical units: https://github.com/postgres/postgres/blob/b76fb6c2a99eb7d49f96e56599fef1ffc1c134c9/src/backend/utils/adt/datetime.c#L187-L247
match s.to_lowercase().as_str() {
"usecond" | "microsecond" | "microseconds" | "microsecon" | "us" | "usec" | "useconds" | "usecs" => Some(Self::Microsec),
"msecond" | "millisecond" | "milliseconds" | "millisecon" | "ms" | "msec" | "mseconds" | "msecs" => Some(Self::Millisec),
"second" | "s" | "sec" | "seconds" | "secs" => Some(Self::Second),
"minute" | "m" | "min" | "mins" | "minutes" => Some(Self::Minute),
"hour" | "hours" | "h" | "hr" | "hrs" => Some(Self::Hour),
_ => None,
}
}
}


#[cfg(test)]
mod test {
use super::*;

#[test]
fn convert_unit() {
let load_time_secs = 75.0;
let load_time_mins = DurationUnit::convert_unit(DurationUnit::Second, load_time_secs, DurationUnit::Minute);
assert_eq!(load_time_mins, 1.25);
}

#[test]
fn parse_unit() {
assert_eq!(DurationUnit::from_str("usecs"), Some(DurationUnit::Microsec));
assert_eq!(DurationUnit::from_str("MINUTE"), Some(DurationUnit::Minute));
assert_eq!(DurationUnit::from_str("MiLlIsEcOn"), Some(DurationUnit::Millisec));
assert_eq!(DurationUnit::from_str("pahar"), None);
assert_eq!(DurationUnit::from_str(""), None);
}
}
1 change: 1 addition & 0 deletions extension/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ mod stabilization_info;
mod raw;
mod datum_utils;
mod pg_any_element;
mod duration;

#[cfg(any(test, feature = "pg_test"))]
mod aggregate_builder_tests;
Expand Down
9 changes: 8 additions & 1 deletion extension/src/stabilization_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ crate::functions_stabilized_at! {
accessorlasttime_out(accessorlasttime),
accessorlastval_in(cstring),
accessorlastval_out(accessorlastval),
accessorintegral_in(cstring),
accessorintegral_out(accessorintegral),
arrow_counter_agg_first_time(countersummary,accessorfirsttime),
arrow_counter_agg_first_val(countersummary,accessorfirstval),
arrow_counter_agg_last_time(countersummary,accessorlasttime),
Expand All @@ -28,6 +30,7 @@ crate::functions_stabilized_at! {
arrow_time_weight_first_val(timeweightsummary,accessorfirstval),
arrow_time_weight_last_time(timeweightsummary,accessorlasttime),
arrow_time_weight_last_val(timeweightsummary,accessorlastval),
arrow_time_weighted_average_integral(timeweightsummary,accessorintegral),
first_time(),
first_time(countersummary),
first_time(timeweightsummary),
Expand All @@ -40,7 +43,9 @@ crate::functions_stabilized_at! {
last_val(),
last_val(countersummary),
last_val(timeweightsummary),
}
integral(),
integral(timeweightsummary),
}
"1.9.0" => {
accessorapproxpercentile_in(cstring),
accessorapproxpercentile_out(accessorapproxpercentile),
Expand Down Expand Up @@ -422,6 +427,7 @@ crate::types_stabilized_at! {
accessorfirstval,
accessorlasttime,
accessorlastval,
accessorintegral,
}
"1.9.0" => {
accessorapproxpercentile,
Expand Down Expand Up @@ -504,6 +510,7 @@ crate::operators_stabilized_at! {
"->"(timeweightsummary,accessorfirstval),
"->"(timeweightsummary,accessorlasttime),
"->"(timeweightsummary,accessorlastval),
"->"(timeweightsummary,accessorintegral),
}
"1.9.0" => {
"->"(countersummary,accessorcorr),
Expand Down
63 changes: 59 additions & 4 deletions extension/src/time_weighted_average.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ use serde::{Deserialize, Serialize};

use crate::{
accessors::{
AccessorAverage, AccessorFirstTime, AccessorFirstVal, AccessorLastTime, AccessorLastVal,
AccessorAverage, AccessorFirstTime, AccessorFirstVal, AccessorLastTime, AccessorLastVal, AccessorIntegral,
},
aggregate_utils::in_aggregate_context,
flatten,
palloc::{Inner, Internal, InternalAsValue, ToInternal},
pg_type, ron_inout_funcs,
pg_type, ron_inout_funcs, duration::DurationUnit,
};

use tspoint::TSPoint;
Expand Down Expand Up @@ -417,6 +417,15 @@ pub fn arrow_time_weighted_average_average(
time_weighted_average_average(sketch)
}

#[pg_operator(immutable, parallel_safe)]
#[opname(->)]
pub fn arrow_time_weighted_average_integral(
tws: Option<TimeWeightSummary>,
accessor: AccessorIntegral,
) -> Option<f64> {
time_weighted_average_integral(tws, String::from_utf8_lossy(accessor.bytes.as_slice()).to_string())
}


#[pg_extern(immutable, parallel_safe, name = "average")]
pub fn time_weighted_average_average(
Expand All @@ -438,6 +447,22 @@ pub fn time_weighted_average_average(
}
}

#[pg_extern(immutable, parallel_safe, name = "integral")]
pub fn time_weighted_average_integral(
tws: Option<TimeWeightSummary>,
unit: String,
) -> Option<f64> {
let unit = match DurationUnit::from_str(&unit) {
Some(unit) => unit,
None => pgx::error!(
"Unrecognized duration unit: {}. Valid units are: usecond, msecond, second, minute, hour",
unit,
),
};
let integral_microsecs = tws?.internal().time_weighted_integral();
Some(DurationUnit::Microsec.convert_unit(integral_microsecs, unit))
}

#[pg_extern(immutable, parallel_safe, name = "interpolated_average", schema = "toolkit_experimental")]
pub fn time_weighted_average_interpolated_average(
tws: Option<TimeWeightSummary>,
Expand Down Expand Up @@ -478,8 +503,17 @@ mod tests {
let stmt = "CREATE TABLE test(ts timestamptz, val DOUBLE PRECISION); SET TIME ZONE 'UTC'";
client.select(stmt, None, None);

// add a couple points
let stmt = "INSERT INTO test VALUES('2020-01-01 00:00:00+00', 10.0), ('2020-01-01 00:01:00+00', 20.0)";
// add a point
let stmt = "INSERT INTO test VALUES('2020-01-01 00:00:00+00', 10.0)";
client.select(stmt, None, None);

let stmt = "SELECT integral(time_weight('Linear', ts, val), 'hrs') FROM test";
assert_eq!(select_one!(client, stmt, f64), 0.0);
let stmt = "SELECT integral(time_weight('LOCF', ts, val), 'msecond') FROM test";
assert_eq!(select_one!(client, stmt, f64), 0.0);

// add another point
let stmt = "INSERT INTO test VALUES('2020-01-01 00:01:00+00', 20.0)";
client.select(stmt, None, None);

// test basic with 2 points
Expand Down Expand Up @@ -519,6 +553,11 @@ mod tests {
let stmt = "SELECT average(time_weight('LOCF', ts, val)) FROM test";
assert!((select_one!(client, stmt, f64) - 15.0).abs() < f64::EPSILON);

let stmt = "SELECT integral(time_weight('Linear', ts, val), 'mins') FROM test";
assert!((select_one!(client, stmt, f64) - 60.0).abs() < f64::EPSILON);
let stmt = "SELECT integral(time_weight('LOCF', ts, val), 'hour') FROM test";
assert!((select_one!(client, stmt, f64) - 1.0).abs() < f64::EPSILON);

//non-evenly spaced values
let stmt = "INSERT INTO test VALUES('2020-01-01 00:08:00+00', 30.0), ('2020-01-01 00:10:00+00', 10.0), ('2020-01-01 00:10:30+00', 20.0), ('2020-01-01 00:20:00+00', 30.0)";
client.select(stmt, None, None);
Expand All @@ -532,16 +571,32 @@ mod tests {
// arrow syntax should be the same
assert!((select_one!(client, stmt, f64) - 21.25).abs() < f64::EPSILON);

let stmt = "SELECT integral(time_weight('Linear', ts, val), 'microseconds') FROM test";
assert!((select_one!(client, stmt, f64) - 25500000000.00).abs() < f64::EPSILON);
let stmt = "SELECT time_weight('Linear', ts, val) \
->integral('microseconds') \
FROM test";
// arrow syntax should be the same
assert!((select_one!(client, stmt, f64) - 25500000000.00).abs() < f64::EPSILON);

let stmt = "SELECT average(time_weight('LOCF', ts, val)) FROM test";
// expected = (10 + 20 + 10 + 20 + 10*4 + 30*2 +10*.5 + 20*9.5) / 20 = 17.75 using last value and carrying for each point
assert!((select_one!(client, stmt, f64) - 17.75).abs() < f64::EPSILON);

let stmt = "SELECT integral(time_weight('LOCF', ts, val), 'milliseconds') FROM test";
assert!((select_one!(client, stmt, f64) - 21300000.0).abs() < f64::EPSILON);

//make sure this works with whatever ordering we throw at it
let stmt = "SELECT average(time_weight('Linear', ts, val ORDER BY random())) FROM test";
assert!((select_one!(client, stmt, f64) - 21.25).abs() < f64::EPSILON);
let stmt = "SELECT average(time_weight('LOCF', ts, val ORDER BY random())) FROM test";
assert!((select_one!(client, stmt, f64) - 17.75).abs() < f64::EPSILON);

let stmt = "SELECT integral(time_weight('Linear', ts, val ORDER BY random()), 'seconds') FROM test";
assert!((select_one!(client, stmt, f64) - 25500.0).abs() < f64::EPSILON);
let stmt = "SELECT integral(time_weight('LOCF', ts, val ORDER BY random()), 'seconds') FROM test";
assert!((select_one!(client, stmt, f64) - 21300.0).abs() < f64::EPSILON);

// make sure we get the same result if we do multi-level aggregation
let stmt = "WITH t AS (SELECT date_trunc('minute', ts), time_weight('Linear', ts, val) AS tws FROM test GROUP BY 1) SELECT average(rollup(tws)) FROM t";
assert!((select_one!(client, stmt, f64) - 21.25).abs() < f64::EPSILON);
Expand Down

0 comments on commit 6882a6f

Please sign in to comment.