Skip to content

Commit

Permalink
Add experimental library for client-side use, with examples.
Browse files Browse the repository at this point in the history
For issue #485.
  • Loading branch information
epgts committed Nov 18, 2022
1 parent bd9b1d9 commit e0e62e1
Show file tree
Hide file tree
Showing 11 changed files with 541 additions and 94 deletions.
9 changes: 9 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 1 addition & 14 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,11 @@
resolver = "2"

members = [
"crates/t-digest-lib",
"extension",
"crates/encodings",
"crates/flat_serialize/flat_serialize_macro",
"crates/flat_serialize/flat_serialize",
"crates/t-digest",
"crates/hyperloglogplusplus",
"crates/udd-sketch",
"crates/time-weighted-average",
"tools/post-install",
"tools/sql-doctester",
"tools/update-tester",
"crates/asap",
"crates/counter-agg",
"crates/tspoint",
"crates/stats-agg",
"crates/aggregate_builder",
"crates/scripting-utilities/*",
"crates/count-min-sketch",
]

[profile.release]
Expand Down
13 changes: 13 additions & 0 deletions crates/t-digest-lib/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "tdigest-lib"
version = "0.0.0"
edition = "2021"

[lib]
name = "timescaledb_toolkit_tdigest"
crate-type = ["cdylib", "staticlib"]

[dependencies]
libc = "0.2.135"

tdigest = { path="../t-digest" }
56 changes: 56 additions & 0 deletions crates/t-digest-lib/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// There is no safety here: it's all in the hands of the caller, bless their heart.
#![allow(clippy::missing_safety_doc)]

#[no_mangle]
pub extern "C" fn timescaledb_toolkit_tdigest_builder_with_size(
size: usize,
) -> Box<tdigest::Builder> {
Box::new(tdigest::Builder::with_size(size))
}

#[no_mangle]
pub unsafe extern "C" fn timescaledb_toolkit_tdigest_push(
builder: *mut tdigest::Builder,
value: f64,
) {
(*builder).push(value)
}

#[no_mangle]
pub unsafe extern "C" fn timescaledb_toolkit_tdigest_merge(
builder: *mut tdigest::Builder,
other: Box<tdigest::Builder>,
) {
let other = *other;
(*builder).merge(other)
}

#[no_mangle]
pub extern "C" fn timescaledb_toolkit_tdigest_builder_free(_: Box<tdigest::Builder>) {}

#[no_mangle]
pub extern "C" fn timescaledb_toolkit_tdigest_build(
mut builder: Box<tdigest::Builder>,
) -> Box<tdigest::TDigest> {
Box::new(builder.build())
}

#[no_mangle]
pub extern "C" fn timescaledb_toolkit_tdigest_free(_: Box<tdigest::TDigest>) {}

// TODO Messy, but good enough to experiment with. We might want to
// into_raw_parts the String and offer a transparent struct containing pointer
// to and size of the buffer, with a ts_tk_tdigest_string_free taking it back
// and releasing it. That also avoids one copy.
#[no_mangle]
pub unsafe extern "C" fn timescaledb_toolkit_tdigest_format_for_postgres(
td: *const tdigest::TDigest,
) -> *mut libc::c_char {
let s = (*td).format_for_postgres();
let buf = libc::malloc(s.len() + 1);
libc::memcpy(buf, s.as_ptr() as *const libc::c_void, s.len());
let buf = buf as *mut libc::c_char;
let r = std::slice::from_raw_parts_mut(buf, s.len() + 1);
r[s.len()] = 0;
buf
}
1 change: 1 addition & 0 deletions crates/t-digest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ edition = "2021"
flat_serialize = {path="../flat_serialize/flat_serialize"}
flat_serialize_macro = {path="../flat_serialize/flat_serialize_macro"}
ordered-float = {version = "1.0", features = ["serde"] }
ron = "0.6.0"
serde = { version = "1.0", features = ["derive"] }

[dev-dependencies]
Expand Down
88 changes: 88 additions & 0 deletions crates/t-digest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,35 @@ impl TDigest {
pub fn num_buckets(&self) -> usize {
self.centroids.len()
}

pub fn format_for_postgres(&self) -> String {
/// Mimicks the version-1 serialization format the extension uses. TODO don't!
#[derive(Serialize)]
struct Hack {
version: u32,
buckets: usize,
max_buckets: usize,
count: u64,
sum: f64,
min: f64,
max: f64,
centroids: Vec<Centroid>,
}

let max_buckets = self.max_size();
let centroids = self.raw_centroids();
ron::to_string(&Hack {
version: 1,
max_buckets,
buckets: centroids.len(),
count: self.count(),
sum: self.sum(),
min: self.min(),
max: self.max(),
centroids: centroids.to_vec(),
})
.unwrap()
}
}

impl Default for TDigest {
Expand Down Expand Up @@ -649,6 +678,65 @@ impl TDigest {
}
}

// This is a tdigest object paired
// with a vector of values that still need to be inserted.
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
pub struct Builder {
#[serde(skip)]
buffer: Vec<f64>,
digested: TDigest,
}

impl From<TDigest> for Builder {
fn from(digested: TDigest) -> Self {
Self {
digested,
..Default::default()
}
}
}

impl Builder {
pub fn with_size(size: usize) -> Self {
Self::from(TDigest::new_with_size(size))
}

// Add a new value, recalculate the digest if we've crossed a threshold.
// TODO threshold is currently set to number of digest buckets, should this be adjusted
pub fn push(&mut self, value: f64) {
self.buffer.push(value);
if self.buffer.len() >= self.digested.max_size() {
self.digest()
}
}

// Update the digest with all accumulated values.
fn digest(&mut self) {
if self.buffer.is_empty() {
return;
}
let new = std::mem::take(&mut self.buffer);
self.digested = self.digested.merge_unsorted(new)
}

pub fn build(&mut self) -> TDigest {
self.digest();
std::mem::take(&mut self.digested)
}

pub fn merge(&mut self, other: Self) {
assert_eq!(self.digested.max_size(), other.digested.max_size());
let digvec = vec![std::mem::take(&mut self.digested), other.digested];
if !self.buffer.is_empty() {
digvec[0].merge_unsorted(std::mem::take(&mut self.buffer));
}
if !other.buffer.is_empty() {
digvec[1].merge_unsorted(other.buffer);
}
self.digested = TDigest::merge_digests(digvec);
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
128 changes: 128 additions & 0 deletions docs/client.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# Client-side aggregation [<sup><mark>experimental</mark></sup>](/docs/README.md#tag-notes)

- Current status: prototype
- Effort remaining: lots

## Purpose

We have long suspected it might be valuable to allow building aggregates
client-side rather than requiring all data be stored in postgres and
aggregated within the toolkit.

https://github.com/timescale/timescaledb-toolkit/issues/485 recently came in
adding weight to this idea. Because this customer requests tdigest, that's
what we'll use for prototyping.

## Use cases

Quoting the above customer:

"In some cases it is not possible to transfer all the non-aggregated data to
TimescaleDB due to it's amount and/or limited connectivity."

## Questions

- Do we want to support a public crate?
- What does that mean?
- Do we need to monitor an email address?
- What promise would we make on response time?
- Is this materially different from what we've already signed up for by
publishing on github?
- How do we handle ownership of the crates.io credentials?

- Which license do we use?
- Some of our code is already a derived work - do we permissively license it
all, or restrict some of it?

- Wire protocol maintenance
- This is a problem we already have, we just didn't realize it, as it is
already possible to construct our aggregates and INSERT them, and they
also in pg dumps; at the moment, you can restore those dumps, though we
haven't made any promise about it. On our stabilized aggregates, users
may assume that is stabilized, too.
- Is there a practical concern here? Or do we just say "not supported"?
- Is it possible to crash the extension with invalid inputs?
- If we commit to a public wire protocol, shouldn't we avoid the
Rust-specific ron and go for something more common?

## Proposal

As a first step, build a crate which externalizes tdigest aggregate creation
(this is prototyped on eg/client branch):

```rust
let mut digester = tdigest::Builder::with_size(N);
loop {
digester.push(value);
}
send_to_postgres(format!("INSERT INTO digests VALUES ({})", digester.build().format_for_postgres()));
```

A good second project would be to add sample projects calling it from popular
languages such as C++ and Python.

In order to provide that API, we must first reorganize the tdigest
implementation so that all business logic is in the tdigest crate. Some is
currently in the pgx extension crate.

For each aggregate, the transient state is actually a Builder pattern hidden
hidden behind pgx machinery.

On this branch, I've moved TDigestTransState into tdigest::Builder.

Currently, we use default ron behavior to serialize the raw implementation
details of the pg_type . Users can insert inconsistent data now, and it
doesn't look like we validate that at insertion time.

We should reconsider this for all pg_types regardless of the overall client
project. Is it possible NOT to offer serialized insertion at all? If so,
turning that off would be a good first step.

Then we can enable it just where we want to.

We should put more thought into the serialization format we intentionally
support. Currently it contains redundancy which we can eliminate by
implementing serialization carefully rather than relying on defaults.

## Proof of concept

This is a simple demonstration of inserting serialized tdigest into a table,
showing that it works the same way as an aggregate built by the extension.

```SQL ,non-transactional
CREATE TABLE test (data DOUBLE PRECISION);
INSERT INTO test SELECT generate_series(0.01, 1, 0.01);

CREATE VIEW digest AS SELECT tdigest(100, data) FROM test;

CREATE TABLE digest2 (tdigest tdigest);
INSERT INTO digest2 VALUES ('(version:1,max_buckets:100,count:100,sum:50.50000000000001,min:0.01,max:1,centroids:[(mean:0.01,weight:1),(mean:0.02,weight:1),(mean:0.03,weight:1),(mean:0.04,weight:1),(mean:0.05,weight:1),(mean:0.06,weight:1),(mean:0.07,weight:1),(mean:0.08,weight:1),(mean:0.09,weight:1),(mean:0.1,weight:1),(mean:0.11,weight:1),(mean:0.12,weight:1),(mean:0.13,weight:1),(mean:0.14,weight:1),(mean:0.15,weight:1),(mean:0.16,weight:1),(mean:0.17,weight:1),(mean:0.18,weight:1),(mean:0.19,weight:1),(mean:0.2,weight:1),(mean:0.21,weight:1),(mean:0.22,weight:1),(mean:0.23,weight:1),(mean:0.24,weight:1),(mean:0.25,weight:1),(mean:0.26,weight:1),(mean:0.27,weight:1),(mean:0.28,weight:1),(mean:0.29,weight:1),(mean:0.3,weight:1),(mean:0.31,weight:1),(mean:0.32,weight:1),(mean:0.33,weight:1),(mean:0.34,weight:1),(mean:0.35,weight:1),(mean:0.36,weight:1),(mean:0.37,weight:1),(mean:0.38,weight:1),(mean:0.39,weight:1),(mean:0.4,weight:1),(mean:0.41,weight:1),(mean:0.42,weight:1),(mean:0.43,weight:1),(mean:0.44,weight:1),(mean:0.45,weight:1),(mean:0.46,weight:1),(mean:0.47,weight:1),(mean:0.48,weight:1),(mean:0.49,weight:1),(mean:0.5,weight:1),(mean:0.51,weight:1),(mean:0.525,weight:2),(mean:0.545,weight:2),(mean:0.565,weight:2),(mean:0.585,weight:2),(mean:0.605,weight:2),(mean:0.625,weight:2),(mean:0.64,weight:1),(mean:0.655,weight:2),(mean:0.675,weight:2),(mean:0.69,weight:1),(mean:0.705,weight:2),(mean:0.72,weight:1),(mean:0.735,weight:2),(mean:0.75,weight:1),(mean:0.76,weight:1),(mean:0.775,weight:2),(mean:0.79,weight:1),(mean:0.8,weight:1),(mean:0.815,weight:2),(mean:0.83,weight:1),(mean:0.84,weight:1),(mean:0.85,weight:1),(mean:0.86,weight:1),(mean:0.87,weight:1),(mean:0.88,weight:1),(mean:0.89,weight:1),(mean:0.9,weight:1),(mean:0.91,weight:1),(mean:0.92,weight:1),(mean:0.93,weight:1),(mean:0.94,weight:1),(mean:0.95,weight:1),(mean:0.96,weight:1),(mean:0.97,weight:1),(mean:0.98,weight:1),(mean:0.99,weight:1),(mean:1,weight:1)])');
```

```SQL
SELECT
min_val(tdigest),
max_val(tdigest),
num_vals(tdigest)
FROM digest;
```
```output
min_val | max_val | num_vals
---------+---------+----------
0.01 | 1 | 100
```

Inserting serialized tdigest into table behaves the same:

```SQL
SELECT
min_val(tdigest),
max_val(tdigest),
num_vals(tdigest)
FROM digest2;
```
```output
min_val | max_val | num_vals
---------+---------+----------
0.01 | 1 | 100
```
Loading

0 comments on commit e0e62e1

Please sign in to comment.