From 69b9e02e8e18373a87926b487a5fccce708d18da Mon Sep 17 00:00:00 2001 From: Eric Gillespie Date: Mon, 28 Nov 2022 13:39:50 -0600 Subject: [PATCH] Add experimental library for client-side use, with examples. For issue #485. --- Cargo.lock | 9 ++ Cargo.toml | 15 +--- crates/t-digest-lib/Cargo.toml | 13 +++ crates/t-digest-lib/src/lib.rs | 57 ++++++++++++ crates/t-digest/Cargo.toml | 1 + crates/t-digest/src/lib.rs | 88 ++++++++++++++++++ docs/client.md | 124 +++++++++++++++++++++++++ docs/examples/tdigest.c | 68 ++++++++++++++ docs/examples/tdigest.py | 86 ++++++++++++++++++ extension/src/accessors/tests.rs | 21 +++++ extension/src/tdigest.rs | 150 +++++++++++++++---------------- 11 files changed, 538 insertions(+), 94 deletions(-) create mode 100644 crates/t-digest-lib/Cargo.toml create mode 100644 crates/t-digest-lib/src/lib.rs create mode 100644 docs/client.md create mode 100644 docs/examples/tdigest.c create mode 100644 docs/examples/tdigest.py create mode 100644 extension/src/accessors/tests.rs diff --git a/Cargo.lock b/Cargo.lock index 63e5d18c..85a968ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1892,9 +1892,18 @@ dependencies = [ "ordered-float", "quickcheck", "quickcheck_macros", + "ron", "serde", ] +[[package]] +name = "tdigest-lib" +version = "0.0.0" +dependencies = [ + "libc", + "tdigest", +] + [[package]] name = "termcolor" version = "1.1.3" diff --git a/Cargo.toml b/Cargo.toml index 1b28423c..681b2258 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,24 +2,11 @@ resolver = "2" members = [ + "crates/t-digest-lib", "extension", - "crates/encodings", - "crates/flat_serialize/flat_serialize_macro", - "crates/flat_serialize/flat_serialize", - "crates/t-digest", - "crates/hyperloglogplusplus", - "crates/udd-sketch", - "crates/time-weighted-average", "tools/post-install", "tools/sql-doctester", "tools/update-tester", - "crates/asap", - "crates/counter-agg", - "crates/tspoint", - "crates/stats-agg", - "crates/aggregate_builder", - "crates/scripting-utilities/*", - "crates/count-min-sketch", ] [profile.release] diff --git a/crates/t-digest-lib/Cargo.toml b/crates/t-digest-lib/Cargo.toml new file mode 100644 index 00000000..c4af47ad --- /dev/null +++ b/crates/t-digest-lib/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "tdigest-lib" +version = "0.0.0" +edition = "2021" + +[lib] +name = "timescaledb_toolkit_tdigest" +crate-type = ["cdylib", "staticlib"] + +[dependencies] +libc = "0.2.135" + +tdigest = { path="../t-digest" } diff --git a/crates/t-digest-lib/src/lib.rs b/crates/t-digest-lib/src/lib.rs new file mode 100644 index 00000000..b5f62667 --- /dev/null +++ b/crates/t-digest-lib/src/lib.rs @@ -0,0 +1,57 @@ +// There is no safety here: it's all in the hands of the caller, bless their heart. +#![allow(clippy::missing_safety_doc)] + +#[no_mangle] +pub extern "C" fn timescaledb_toolkit_tdigest_builder_with_size( + size: usize, +) -> Box { + Box::new(tdigest::Builder::with_size(size)) +} + +#[no_mangle] +pub unsafe extern "C" fn timescaledb_toolkit_tdigest_push( + builder: *mut tdigest::Builder, + value: f64, +) { + (*builder).push(value) +} + +// TODO Don't abort the process if `builder` and `other` weren't created with the same size. +#[no_mangle] +pub unsafe extern "C" fn timescaledb_toolkit_tdigest_merge( + builder: *mut tdigest::Builder, + other: Box, +) { + let other = *other; + (*builder).merge(other) +} + +#[no_mangle] +pub extern "C" fn timescaledb_toolkit_tdigest_builder_free(_: Box) {} + +#[no_mangle] +pub extern "C" fn timescaledb_toolkit_tdigest_build( + mut builder: Box, +) -> Box { + Box::new(builder.build()) +} + +#[no_mangle] +pub extern "C" fn timescaledb_toolkit_tdigest_free(_: Box) {} + +// TODO Messy, but good enough to experiment with. We might want to +// into_raw_parts the String and offer a transparent struct containing pointer +// to and size of the buffer, with a ts_tk_tdigest_string_free taking it back +// and releasing it. That also avoids one copy. +#[no_mangle] +pub unsafe extern "C" fn timescaledb_toolkit_tdigest_format_for_postgres( + td: *const tdigest::TDigest, +) -> *mut libc::c_char { + let s = (*td).format_for_postgres(); + let buf = libc::malloc(s.len() + 1); + libc::memcpy(buf, s.as_ptr() as *const libc::c_void, s.len()); + let buf = buf as *mut libc::c_char; + let r = std::slice::from_raw_parts_mut(buf, s.len() + 1); + r[s.len()] = 0; + buf +} diff --git a/crates/t-digest/Cargo.toml b/crates/t-digest/Cargo.toml index 21116aaf..da19fed6 100644 --- a/crates/t-digest/Cargo.toml +++ b/crates/t-digest/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" flat_serialize = {path="../flat_serialize/flat_serialize"} flat_serialize_macro = {path="../flat_serialize/flat_serialize_macro"} ordered-float = {version = "1.0", features = ["serde"] } +ron = "0.6.0" serde = { version = "1.0", features = ["derive"] } [dev-dependencies] diff --git a/crates/t-digest/src/lib.rs b/crates/t-digest/src/lib.rs index 1dd787cb..8adfae66 100644 --- a/crates/t-digest/src/lib.rs +++ b/crates/t-digest/src/lib.rs @@ -205,6 +205,35 @@ impl TDigest { pub fn num_buckets(&self) -> usize { self.centroids.len() } + + pub fn format_for_postgres(&self) -> String { + /// Mimicks the version-1 serialization format the extension uses. TODO don't! + #[derive(Serialize)] + struct Hack { + version: u32, + buckets: usize, + max_buckets: usize, + count: u64, + sum: f64, + min: f64, + max: f64, + centroids: Vec, + } + + let max_buckets = self.max_size(); + let centroids = self.raw_centroids(); + ron::to_string(&Hack { + version: 1, + max_buckets, + buckets: centroids.len(), + count: self.count(), + sum: self.sum(), + min: self.min(), + max: self.max(), + centroids: centroids.to_vec(), + }) + .unwrap() + } } impl Default for TDigest { @@ -649,6 +678,65 @@ impl TDigest { } } +// This is a tdigest object paired +// with a vector of values that still need to be inserted. +#[derive(Clone, Debug, Default, Deserialize, Serialize)] +pub struct Builder { + #[serde(skip)] + buffer: Vec, + digested: TDigest, +} + +impl From for Builder { + fn from(digested: TDigest) -> Self { + Self { + digested, + ..Default::default() + } + } +} + +impl Builder { + pub fn with_size(size: usize) -> Self { + Self::from(TDigest::new_with_size(size)) + } + + // Add a new value, recalculate the digest if we've crossed a threshold. + // TODO threshold is currently set to number of digest buckets, should this be adjusted + pub fn push(&mut self, value: f64) { + self.buffer.push(value); + if self.buffer.len() >= self.digested.max_size() { + self.digest() + } + } + + // Update the digest with all accumulated values. + fn digest(&mut self) { + if self.buffer.is_empty() { + return; + } + let new = std::mem::take(&mut self.buffer); + self.digested = self.digested.merge_unsorted(new) + } + + pub fn build(&mut self) -> TDigest { + self.digest(); + std::mem::take(&mut self.digested) + } + + pub fn merge(&mut self, other: Self) { + assert_eq!(self.digested.max_size(), other.digested.max_size()); + let digvec = vec![std::mem::take(&mut self.digested), other.digested]; + if !self.buffer.is_empty() { + digvec[0].merge_unsorted(std::mem::take(&mut self.buffer)); + } + if !other.buffer.is_empty() { + digvec[1].merge_unsorted(other.buffer); + } + self.digested = TDigest::merge_digests(digvec); + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/docs/client.md b/docs/client.md new file mode 100644 index 00000000..92b0443b --- /dev/null +++ b/docs/client.md @@ -0,0 +1,124 @@ +# Client-side aggregation [experimental](/docs/README.md#tag-notes) + +- Current status: prototype +- Effort remaining: lots + +## Purpose + +We have long suspected it might be valuable to allow building aggregates +client-side rather than requiring all data be stored in postgres and +aggregated within the toolkit. + +https://github.com/timescale/timescaledb-toolkit/issues/485 recently came in +adding weight to this idea. Because this customer requests tdigest, that's +what we'll use for prototyping. + +## Use cases + +Quoting the above customer: + +"In some cases it is not possible to transfer all the non-aggregated data to +TimescaleDB due to it's amount and/or limited connectivity." + +## Questions + +- Do we want to support a public crate? + - What does that mean? + - Do we need to monitor an email address? + - What promise would we make on response time? + - Is this materially different from what we've already signed up for by + publishing on github? + - How do we handle ownership of the crates.io credentials? + +- Which license do we use? + - Some of our code is already a derived work - do we permissively license it + all, or restrict some of it? + +- Wire protocol maintenance + - This is a problem we already have, we just didn't realize it, as it is + already possible to construct our aggregates and INSERT them, and they + also in pg dumps; at the moment, you can restore those dumps, though we + haven't made any promise about it. On our stabilized aggregates, users + may assume that is stabilized, too. + - Is there a practical concern here? Or do we just say "not supported"? + - Is it possible to crash the extension with invalid inputs? + - If we commit to a public wire protocol, shouldn't we avoid the + Rust-specific ron and go for something more common? + +## Proposal + +As a first step, build a crate which externalizes tdigest aggregate creation. + +```rust +let mut digester = tdigest::Builder::with_size(N); +loop { + digester.push(value); +} +send_to_postgres(format!("INSERT INTO digests VALUES ({})", digester.build().format_for_postgres())); +``` + +In order to provide that API, we must first reorganize the tdigest +implementation so that all business logic is in the tdigest crate. Some is +currently in the pgx extension crate. + +For each aggregate, the transient state is actually a Builder pattern hidden +hidden behind pgx machinery. + +On this branch, I've moved TDigestTransState into tdigest::Builder. + +Currently, we use default ron behavior to serialize the raw implementation +details of the pg_type . Users can insert inconsistent data now, and it +doesn't look like we validate that at insertion time. + +We should reconsider this for all pg_types regardless of the overall client +project. Is it possible NOT to offer serialized insertion at all? If so, +turning that off would be a good first step. + +Then we can enable it just where we want to. + +We should put more thought into the serialization format we intentionally +support. Currently it contains redundancy which we can eliminate by +implementing serialization carefully rather than relying on defaults. + +## Proof of concept + +This is a simple demonstration of inserting serialized tdigest into a table, +showing that it works the same way as an aggregate built by the extension. + +```SQL ,non-transactional +CREATE TABLE test (data DOUBLE PRECISION); +INSERT INTO test SELECT generate_series(0.01, 1, 0.01); + +CREATE VIEW digest AS SELECT tdigest(100, data) FROM test; + +CREATE TABLE digest2 (tdigest tdigest); +INSERT INTO digest2 VALUES ('(version:1,max_buckets:100,count:100,sum:50.50000000000001,min:0.01,max:1,centroids:[(mean:0.01,weight:1),(mean:0.02,weight:1),(mean:0.03,weight:1),(mean:0.04,weight:1),(mean:0.05,weight:1),(mean:0.06,weight:1),(mean:0.07,weight:1),(mean:0.08,weight:1),(mean:0.09,weight:1),(mean:0.1,weight:1),(mean:0.11,weight:1),(mean:0.12,weight:1),(mean:0.13,weight:1),(mean:0.14,weight:1),(mean:0.15,weight:1),(mean:0.16,weight:1),(mean:0.17,weight:1),(mean:0.18,weight:1),(mean:0.19,weight:1),(mean:0.2,weight:1),(mean:0.21,weight:1),(mean:0.22,weight:1),(mean:0.23,weight:1),(mean:0.24,weight:1),(mean:0.25,weight:1),(mean:0.26,weight:1),(mean:0.27,weight:1),(mean:0.28,weight:1),(mean:0.29,weight:1),(mean:0.3,weight:1),(mean:0.31,weight:1),(mean:0.32,weight:1),(mean:0.33,weight:1),(mean:0.34,weight:1),(mean:0.35,weight:1),(mean:0.36,weight:1),(mean:0.37,weight:1),(mean:0.38,weight:1),(mean:0.39,weight:1),(mean:0.4,weight:1),(mean:0.41,weight:1),(mean:0.42,weight:1),(mean:0.43,weight:1),(mean:0.44,weight:1),(mean:0.45,weight:1),(mean:0.46,weight:1),(mean:0.47,weight:1),(mean:0.48,weight:1),(mean:0.49,weight:1),(mean:0.5,weight:1),(mean:0.51,weight:1),(mean:0.525,weight:2),(mean:0.545,weight:2),(mean:0.565,weight:2),(mean:0.585,weight:2),(mean:0.605,weight:2),(mean:0.625,weight:2),(mean:0.64,weight:1),(mean:0.655,weight:2),(mean:0.675,weight:2),(mean:0.69,weight:1),(mean:0.705,weight:2),(mean:0.72,weight:1),(mean:0.735,weight:2),(mean:0.75,weight:1),(mean:0.76,weight:1),(mean:0.775,weight:2),(mean:0.79,weight:1),(mean:0.8,weight:1),(mean:0.815,weight:2),(mean:0.83,weight:1),(mean:0.84,weight:1),(mean:0.85,weight:1),(mean:0.86,weight:1),(mean:0.87,weight:1),(mean:0.88,weight:1),(mean:0.89,weight:1),(mean:0.9,weight:1),(mean:0.91,weight:1),(mean:0.92,weight:1),(mean:0.93,weight:1),(mean:0.94,weight:1),(mean:0.95,weight:1),(mean:0.96,weight:1),(mean:0.97,weight:1),(mean:0.98,weight:1),(mean:0.99,weight:1),(mean:1,weight:1)])'); +``` + +```SQL +SELECT + min_val(tdigest), + max_val(tdigest), + num_vals(tdigest) + FROM digest; +``` +```output + min_val | max_val | num_vals +---------+---------+---------- + 0.01 | 1 | 100 +``` + +Inserting serialized tdigest into table behaves the same: + +```SQL +SELECT + min_val(tdigest), + max_val(tdigest), + num_vals(tdigest) + FROM digest2; +``` +```output + min_val | max_val | num_vals +---------+---------+---------- + 0.01 | 1 | 100 +``` diff --git a/docs/examples/tdigest.c b/docs/examples/tdigest.c new file mode 100644 index 00000000..bfdab004 --- /dev/null +++ b/docs/examples/tdigest.c @@ -0,0 +1,68 @@ +// cc -o tdigest tdigest.c $CARGO_TARGET_DIR/$PROFILE/libtimescaledb_toolkit_tdigest.a -lm -lpthread -ldl + +// Sample program which prints the expected output of the test_tdigest_io test. + +//////////////////////////////////////////////////////////////////////////////// +// TODO Generate a header from tdigest-lib crate. + +#include + +struct TDigestBuilder; +struct TDigest; + +// Return pointer to new TDigestBuilder. +// MUST NOT be passed to free(3). Instead, pass to timescaledb_toolkit_tdigest_builder_free to +// discard or to timescaledb_toolkit_tdigest_build to convert to TDigest. +// Never returns NULL. +struct TDigestBuilder * +timescaledb_toolkit_tdigest_builder_with_size(size_t size); + +void +timescaledb_toolkit_tdigest_push(struct TDigestBuilder *builder, double value); + +void +timescaledb_toolkit_tdigest_merge(struct TDigestBuilder *builder, struct TDigestBuilder *other); + +// Free a TDigestBuilder that has not been built. +// MUST NOT be passed NULL. +void +timescaledb_toolkit_tdigest_builder_free(struct TDigestBuilder *builder); + +// Return pointer to new TDigest built from builder. +// builder MUST NOT be passed to timescaledb_toolkit_tdigest_builder_free . +struct TDigest * +timescaledb_toolkit_tdigest_build(struct TDigestBuilder *builder); + +// Free a TDigest. +void +timescaledb_toolkit_tdigest_free(struct TDigest *td); + +// Return pointer to null-terminated buffer containing ASCII serialization of TDigest suitable for +// use with postgresql INSERT. +// Free the buffer with free(3). +char * +timescaledb_toolkit_tdigest_format_for_postgres(struct TDigest *td); + +//////////////////////////////////////////////////////////////////////////////// + +#include +#include + +int +main() +{ + struct TDigestBuilder *builder = timescaledb_toolkit_tdigest_builder_with_size(100); + double value; + for (value = 1.0; value <= 100.0; value++) { + timescaledb_toolkit_tdigest_push(builder, value); + } + + struct TDigest *td = timescaledb_toolkit_tdigest_build(builder); + char *formatted = timescaledb_toolkit_tdigest_format_for_postgres(td); + printf("%s\n", formatted); + free(formatted); + + timescaledb_toolkit_tdigest_free(td); + + return 0; +} diff --git a/docs/examples/tdigest.py b/docs/examples/tdigest.py new file mode 100644 index 00000000..fe136513 --- /dev/null +++ b/docs/examples/tdigest.py @@ -0,0 +1,86 @@ +import ctypes +import os + +_cdll = ctypes.CDLL(os.path.join( + os.getenv('CARGO_TARGET_DIR', 'target'), + os.getenv('PROFILE', 'debug'), + 'libtimescaledb_toolkit_tdigest.so')) +_cdll.timescaledb_toolkit_tdigest_builder_with_size.restype = ctypes.c_void_p +_cdll.timescaledb_toolkit_tdigest_build.restype = ctypes.c_void_p +_cdll.timescaledb_toolkit_tdigest_format_for_postgres.restype = ctypes.POINTER(ctypes.c_char) +_cdll.timescaledb_toolkit_tdigest_push.restype = None +_cdll.timescaledb_toolkit_tdigest_merge.restype = None +_cdll.timescaledb_toolkit_tdigest_builder_free.restype = None +_cdll.timescaledb_toolkit_tdigest_free.restype = None + +# Wrapper classes use `real_pointer` to keep hold of the real pointer for as +# long as it needs to be released. +# We copy it to self.pointer to enforce use of `with` (as much as anything can be enforced in Python). +# Attempting to forego `with` results in `AttributeError`. + +class TDigest: + + class Builder: + def __init__(self, pointer): + self.real_pointer = pointer + + def __enter__(self): + self.pointer = self.real_pointer + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.__del__() + self.real_pointer = None + if 'pointer' in self.__dict__: + del self.__dict__['pointer'] + + def __del__(self): + if self.real_pointer is not None: + _cdll.timescaledb_toolkit_tdigest_builder_free(self.real_pointer) + + def with_size(size): + return TDigest.Builder(ctypes.c_void_p(_cdll.timescaledb_toolkit_tdigest_builder_with_size(ctypes.c_size_t(size)))) + + def push(self, value): + _cdll.timescaledb_toolkit_tdigest_push(self.pointer, ctypes.c_double(value)) + + def build(self): + td = TDigest(ctypes.c_void_p(_cdll.timescaledb_toolkit_tdigest_build(self.pointer))) + self.real_pointer = None + del self.__dict__['pointer'] + return td + + def __init__(self, pointer): + self.real_pointer = pointer + + def __enter__(self): + self.pointer = self.real_pointer + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.__del__() + self.real_pointer = None + if 'pointer' in self.__dict__: + del self.__dict__['pointer'] + + def __del__(self): + if self.real_pointer is not None: + _cdll.timescaledb_toolkit_tdigest_free(self.real_pointer) + + def format_for_postgres(self): + buf = _cdll.timescaledb_toolkit_tdigest_format_for_postgres(self.pointer) + s = ctypes.cast(buf, ctypes.c_char_p).value.decode('ascii') + # TODO free(3) left as an exercise to the reader. This is for GNU libc on Linux/amd64: + ctypes.CDLL('libc.so.6').free(buf) + return s + +# Sample program which prints the expected output of the test_tdigest_io test. +def test(): + with TDigest.Builder.with_size(100) as builder: + for value in range(1, 101): + builder.push(value) + with builder.build() as td: + print(td.format_for_postgres()) + +if __name__ == '__main__': + test() diff --git a/extension/src/accessors/tests.rs b/extension/src/accessors/tests.rs new file mode 100644 index 00000000..8182115d --- /dev/null +++ b/extension/src/accessors/tests.rs @@ -0,0 +1,21 @@ +use pgx::*; + +use super::accessor; + +//use crate::{accessor, build}; + +// TODO don't require that trailing comma +accessor! { one_field(value: f64,) } +accessor! { two_fields(a: f64, b: i64,) } + +#[test] +fn one_field_works() { + let d: AccessorOneField = accessor_one_field(1.0); + assert_eq!(1.0, d.value); +} + +#[test] +fn two_field_works() { + let d: AccessorTwoFields = accessor_two_fields(1.0, 2); + assert_eq!((1.0, 2), (d.a, d.b)); +} diff --git a/extension/src/tdigest.rs b/extension/src/tdigest.rs index d5fd507e..e1e235c6 100644 --- a/extension/src/tdigest.rs +++ b/extension/src/tdigest.rs @@ -1,6 +1,4 @@ -use std::{convert::TryInto, mem::take, ops::Deref}; - -use serde::{Deserialize, Serialize}; +use std::{convert::TryInto, ops::Deref}; use pgx::*; @@ -12,40 +10,11 @@ use crate::{ aggregate_utils::in_aggregate_context, flatten, palloc::{Inner, Internal, InternalAsValue, ToInternal}, - pg_type, ron_inout_funcs, + pg_type, }; use tdigest::{Centroid, TDigest as InternalTDigest}; -// Intermediate state kept in postgres. This is a tdigest object paired -// with a vector of values that still need to be inserted. -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] -pub struct TDigestTransState { - #[serde(skip)] - buffer: Vec, - digested: InternalTDigest, -} - -impl TDigestTransState { - // Add a new value, recalculate the digest if we've crossed a threshold. - // TODO threshold is currently set to number of digest buckets, should this be adjusted - fn push(&mut self, value: f64) { - self.buffer.push(value); - if self.buffer.len() >= self.digested.max_size() { - self.digest() - } - } - - // Update the digest with all accumulated values. - fn digest(&mut self) { - if self.buffer.is_empty() { - return; - } - let new = take(&mut self.buffer); - self.digested = self.digested.merge_unsorted(new) - } -} - // PG function for adding values to a digest. // Null values are ignored. #[pg_extern(immutable, parallel_safe)] @@ -58,11 +27,11 @@ pub fn tdigest_trans( tdigest_trans_inner(unsafe { state.to_inner() }, size, value, fcinfo).internal() } pub fn tdigest_trans_inner( - state: Option>, + state: Option>, size: i32, value: Option, fcinfo: pg_sys::FunctionCallInfo, -) -> Option> { +) -> Option> { unsafe { in_aggregate_context(fcinfo, || { let value = match value { @@ -77,11 +46,7 @@ pub fn tdigest_trans_inner( } }; let mut state = match state { - None => TDigestTransState { - buffer: vec![], - digested: InternalTDigest::new_with_size(size.try_into().unwrap()), - } - .into(), + None => tdigest::Builder::with_size(size.try_into().unwrap()).into(), Some(state) => state, }; state.push(value); @@ -101,34 +66,19 @@ pub fn tdigest_combine( } pub fn tdigest_combine_inner( - state1: Option>, - state2: Option>, + state1: Option>, + state2: Option>, fcinfo: pg_sys::FunctionCallInfo, -) -> Option> { +) -> Option> { unsafe { - in_aggregate_context(fcinfo, || { - match (state1, state2) { - (None, None) => None, - (None, Some(state2)) => Some(state2.clone().into()), - (Some(state1), None) => Some(state1.clone().into()), - (Some(state1), Some(state2)) => { - assert_eq!(state1.digested.max_size(), state2.digested.max_size()); - let digvec = vec![state1.digested.clone(), state2.digested.clone()]; - if !state1.buffer.is_empty() { - digvec[0].merge_unsorted(state1.buffer.clone()); // merge_unsorted should take a reference - } - if !state2.buffer.is_empty() { - digvec[1].merge_unsorted(state2.buffer.clone()); - } - - Some( - TDigestTransState { - buffer: vec![], - digested: InternalTDigest::merge_digests(digvec), - } - .into(), - ) - } + in_aggregate_context(fcinfo, || match (state1, state2) { + (None, None) => None, + (None, Some(state2)) => Some(state2.clone().into()), + (Some(state1), None) => Some(state1.clone().into()), + (Some(state1), Some(state2)) => { + let mut merged = state1.clone(); + merged.merge(state2.clone()); + Some(merged.into()) } }) } @@ -138,23 +88,27 @@ use crate::raw::bytea; #[pg_extern(immutable, parallel_safe, strict)] pub fn tdigest_serialize(state: Internal) -> bytea { - let state: &mut TDigestTransState = unsafe { state.get_mut().unwrap() }; - state.digest(); - crate::do_serialize!(state) + let state: &mut tdigest::Builder = unsafe { state.get_mut().unwrap() }; + // TODO this macro is really broken + let hack = state.build(); + let hackref = &hack; + crate::do_serialize!(hackref) } #[pg_extern(strict, immutable, parallel_safe)] pub fn tdigest_deserialize(bytes: bytea, _internal: Internal) -> Option { tdigest_deserialize_inner(bytes).internal() } -pub fn tdigest_deserialize_inner(bytes: bytea) -> Inner { - crate::do_deserialize!(bytes, TDigestTransState) +pub fn tdigest_deserialize_inner(bytes: bytea) -> Inner { + crate::do_deserialize!(bytes, tdigest::Builder) } // PG object for the digest. pg_type! { #[derive(Debug)] struct TDigest<'input> { + // We compute this. It's a (harmless) bug that we serialize it. + #[serde(skip_deserializing)] buckets: u32, max_buckets: u32, count: u64, @@ -165,7 +119,33 @@ pg_type! { } } -ron_inout_funcs!(TDigest); +impl<'input> InOutFuncs for TDigest<'input> { + fn output(&self, buffer: &mut StringInfo) { + use crate::serialization::{str_to_db_encoding, EncodedStr::*}; + + let stringified = ron::to_string(&**self).unwrap(); + match str_to_db_encoding(&stringified) { + Utf8(s) => buffer.push_str(s), + Other(s) => buffer.push_bytes(s.to_bytes()), + } + } + + fn input(input: &pgx::cstr_core::CStr) -> TDigest<'input> + where + Self: Sized, + { + use crate::serialization::str_from_db_encoding; + + let input = str_from_db_encoding(input); + let mut val: TDigestData = ron::from_str(input).unwrap(); + val.buckets = val + .centroids + .len() + .try_into() + .expect("centroids len fits into u32"); + unsafe { Self(val, crate::type_builder::CachedDatum::None).flatten() } + } +} impl<'input> TDigest<'input> { fn to_internal_tdigest(&self) -> InternalTDigest { @@ -200,18 +180,16 @@ impl<'input> TDigest<'input> { } } -// PG function to generate a user-facing TDigest object from an internal TDigestTransState. +// PG function to generate a user-facing TDigest object from an internal tdigest::Builder. #[pg_extern(immutable, parallel_safe)] fn tdigest_final(state: Internal, fcinfo: pg_sys::FunctionCallInfo) -> Option> { unsafe { in_aggregate_context(fcinfo, || { - let state: &mut TDigestTransState = match state.get_mut() { + let state: &mut tdigest::Builder = match state.get_mut() { None => return None, Some(state) => state, }; - state.digest(); - - TDigest::from_internal_tdigest(&state.digested).into() + TDigest::from_internal_tdigest(&state.build()).into() }) } } @@ -600,6 +578,19 @@ mod tests { }); } + #[pg_test] + fn serialization_matches() { + let mut t = InternalTDigest::new_with_size(10); + let vals = vec![1.0, 1.0, 1.0, 2.0, 1.0, 1.0]; + for v in vals { + t = t.merge_unsorted(vec![v]); + } + let pgt = TDigest::from_internal_tdigest(&t); + let mut si = StringInfo::new(); + pgt.output(&mut si); + assert_eq!(t.format_for_postgres(), si.to_string()); + } + #[pg_test] fn test_tdigest_io() { Spi::execute(|client| { @@ -655,11 +646,10 @@ mod tests { assert_eq!(buffer, expected); let expected = pgx::varlena::rust_byte_slice_to_bytea(&expected); - let new_state = + let mut new_state = tdigest_deserialize_inner(bytea(pg_sys::Datum::from(expected.as_ptr()))); - control.digest(); // Serialized form is always digested - assert_eq!(&*new_state, &*control); + assert_eq!(new_state.build(), control.build()); } }