Skip to content

Commit

Permalink
fixing interpolation logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Brian Rowe committed Sep 3, 2022
1 parent 56da41d commit 7f8996d
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 9 deletions.
61 changes: 59 additions & 2 deletions extension/src/counter_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ impl<'input> CounterSummary<'input> {
prev: Option<CounterSummary>,
next: Option<CounterSummary>,
) -> CounterSummary<'static> {
let prev = prev.map(|summary| {
let prev = if self.first.ts > interval_start {
prev.map(|summary| {
let first = if summary.last.val > self.first.val {
TSPoint{ ts: summary.last.ts, val: 0.}
} else {
Expand All @@ -101,7 +102,10 @@ impl<'input> CounterSummary<'input> {
time_weighted_average::TimeWeightMethod::Linear
.interpolate(first, Some(self.first), interval_start)
.expect("unable to interpolate lower bound")
});
})
} else {
None
};

let next = next.map(|summary| {
let last = if self.last.val > summary.first.val {
Expand Down Expand Up @@ -1221,6 +1225,7 @@ mod tests {
deltas.next().unwrap()[1].value(),
Some(35. + 30. - 27.5)
);
assert!(deltas.next().is_none());

let mut rates = client.select(
r#"SELECT
Expand Down Expand Up @@ -1255,6 +1260,58 @@ mod tests {
rates.next().unwrap()[1].value(),
Some((35. + 30. - 27.5)/(16. * 60. * 60.))
);
assert!(rates.next().is_none());
});
}

#[pg_test]
fn interpolated_delta_with_aligned_point() {
Spi::execute(|client| {
client.select(
"CREATE TABLE test(time timestamptz, value double precision, bucket timestamptz)",
None,
None,
);
client.select(
r#"INSERT INTO test VALUES
('2020-1-1 10:00'::timestamptz, 10.0, '2020-1-1'::timestamptz),
('2020-1-1 12:00'::timestamptz, 40.0, '2020-1-1'::timestamptz),
('2020-1-1 16:00'::timestamptz, 20.0, '2020-1-1'::timestamptz),
('2020-1-2 0:00'::timestamptz, 15.0, '2020-1-2'::timestamptz),
('2020-1-2 12:00'::timestamptz, 50.0, '2020-1-2'::timestamptz),
('2020-1-2 20:00'::timestamptz, 25.0, '2020-1-2'::timestamptz)"#,
None,
None,
);

let mut deltas = client.select(
r#"SELECT
toolkit_experimental.interpolated_delta(
agg,
bucket,
'1 day'::interval,
LAG(agg) OVER (ORDER BY bucket),
LEAD(agg) OVER (ORDER BY bucket)
) FROM (
SELECT bucket, counter_agg(time, value) as agg
FROM test
GROUP BY bucket
) s
ORDER BY bucket"#,
None,
None,
);
// Day 1, start at 10, interpolated end of day is 15 (after reset), reset at 40 and 20
assert_eq!(
deltas.next().unwrap()[1].value(),
Some(15. + 40. + 20. - 10.)
);
// Day 2, start is 15, end is 25, reset at 50
assert_eq!(
deltas.next().unwrap()[1].value(),
Some(25. + 50. - 15.)
);
assert!(deltas.next().is_none());
});
}

Expand Down
65 changes: 60 additions & 5 deletions extension/src/gauge_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,15 @@ mod toolkit_experimental {
let prev = prev.map(MetricSummary::from);
let next = next.map(MetricSummary::from);

let prev = prev.map(|summary|
time_weighted_average::TimeWeightMethod::Linear
.interpolate(summary.last, Some(this.first), interval_start)
.expect("unable to interpolate lower bound")
);
let prev = if this.first.ts > interval_start {
prev.map(|summary|
time_weighted_average::TimeWeightMethod::Linear
.interpolate(summary.last, Some(this.first), interval_start)
.expect("unable to interpolate lower bound")
)
} else {
None
};

let next = next.map(|summary|
time_weighted_average::TimeWeightMethod::Linear
Expand Down Expand Up @@ -1052,6 +1056,57 @@ mod tests {
});
}

#[pg_test]
fn guage_agg_interpolated_delta_with_aligned_point() {
Spi::execute(|client| {
client.select(
"CREATE TABLE test(time timestamptz, value double precision, bucket timestamptz)",
None,
None,
);
client.select(
r#"INSERT INTO test VALUES
('2020-1-1 10:00'::timestamptz, 10.0, '2020-1-1'::timestamptz),
('2020-1-1 12:00'::timestamptz, 40.0, '2020-1-1'::timestamptz),
('2020-1-1 16:00'::timestamptz, 20.0, '2020-1-1'::timestamptz),
('2020-1-2 0:00'::timestamptz, 15.0, '2020-1-2'::timestamptz),
('2020-1-2 12:00'::timestamptz, 50.0, '2020-1-2'::timestamptz),
('2020-1-2 20:00'::timestamptz, 25.0, '2020-1-2'::timestamptz)"#,
None,
None,
);

let mut deltas = client.select(
r#"SELECT
toolkit_experimental.interpolated_delta(
agg,
bucket,
'1 day'::interval,
LAG(agg) OVER (ORDER BY bucket),
LEAD(agg) OVER (ORDER BY bucket)
) FROM (
SELECT bucket, toolkit_experimental.gauge_agg(time, value) as agg
FROM test
GROUP BY bucket
) s
ORDER BY bucket"#,
None,
None,
);
// Day 1, start at 10, interpolated end of day is 15 (after reset)
assert_eq!(
deltas.next().unwrap()[1].value(),
Some(15. - 10.)
);
// Day 2, start is 15, end is 25
assert_eq!(
deltas.next().unwrap()[1].value(),
Some(25. - 15.)
);
assert!(deltas.next().is_none());
});
}

// TODO https://github.com/timescale/timescaledb-toolkit/issues/362
// TODO why doesn't this catch the error under github actions?
// #[pg_test(error = "returned Datum was NULL")]
Expand Down
4 changes: 2 additions & 2 deletions extension/src/time_weighted_average.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl<'input> TimeWeightSummary<'input> {
prev: Option<TimeWeightSummary>,
next: Option<TimeWeightSummary>,
) -> TimeWeightSummary<'static> {
assert!(interval_start <= self.first.ts && interval_start + interval_len >= self.last.ts);
assert!(interval_start <= self.first.ts && interval_start + interval_len > self.last.ts);
let mut new_sum = self.weighted_sum;
let new_start = match prev {
Some(prev) if interval_start < self.first.ts => {
Expand All @@ -62,7 +62,7 @@ impl<'input> TimeWeightSummary<'input> {
_ => self.first
};
let new_end = match next {
Some(next) if self.last.ts < interval_start + interval_len => {
Some(next) => {
let new_end =
self.method
.interpolate(self.last, Some(next.first), interval_start + interval_len)
Expand Down

0 comments on commit 7f8996d

Please sign in to comment.