diff --git a/CHANGELOG.md b/CHANGELOG.md index 787ca72..82916ae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,9 +9,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] - ReleaseDate ### Added - derive: support `serde::skip_deserializing` ([#83]). +- the `quanta` feature, enabled by default. ### Changed - **BREAKING** drop the `wa-37420` feature. +- inserter: increase performance if the `quanta` feature is enabled. - derive: move to syn v2. ### Fixed diff --git a/Cargo.toml b/Cargo.toml index 68382cd..55e4dc2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,7 +34,7 @@ required-features = ["test-util"] debug = true [features] -default = ["lz4", "tls"] +default = ["lz4", "tls", "quanta"] test-util = ["hyper/server"] watch = ["dep:sha-1", "dep:serde_json"] @@ -42,6 +42,7 @@ lz4 = ["dep:lz4", "dep:clickhouse-rs-cityhash-sys"] uuid = ["dep:uuid"] time = ["dep:time"] tls = ["dep:hyper-tls"] +quanta = ["dep:quanta"] [dependencies] clickhouse-derive = { version = "0.1.1", path = "derive" } @@ -63,6 +64,7 @@ clickhouse-rs-cityhash-sys = { version = "0.1.2", optional = true } uuid = { version = "1", optional = true } time = { version = "0.3", optional = true } bstr = { version = "1.2", default-features = false } +quanta = { version = "0.12", optional = true } [dev-dependencies] criterion = "0.5.0" diff --git a/README.md b/README.md index dc35100..46fe75e 100644 --- a/README.md +++ b/README.md @@ -209,10 +209,11 @@ See [examples](https://github.com/loyd/clickhouse.rs/tree/master/examples). ## Feature Flags * `lz4` (enabled by default) — enables `Compression::Lz4` and `Compression::Lz4Hc(_)` variants. If enabled, `Compression::Lz4` is used by default for all queries except for `WATCH`. * `tls` (enabled by default) — supports urls with the `HTTPS` schema. +* `quanta` (enabled by default) - uses the [`quanta`](https://docs.rs/quanta) crate to speed the inserter up. Not used if `test-util` is enabled (thus, time can be managed by `tokio::time::advance()` in custom tests). * `test-util` — adds mocks. See [the example](https://github.com/loyd/clickhouse.rs/tree/master/examples/mock.rs). Use it only in `dev-dependencies`. * `watch` — enables `client.watch` functionality. See the corresponding section for details. -* `uuid` — adds `serde::uuid` to work with [uuid](https://docs.rs/uuid/latest/uuid/) crate. -* `time` — adds `serde::time` to work with [time](https://docs.rs/time/latest/time/) crate. +* `uuid` — adds `serde::uuid` to work with [uuid](https://docs.rs/uuid) crate. +* `time` — adds `serde::time` to work with [time](https://docs.rs/time) crate. ## Data Types * `(U)Int(8|16|32|64|128)` maps to/from corresponding `(u|i)(8|16|32|64|128)` types or newtypes around them. diff --git a/src/inserter.rs b/src/inserter.rs index 7cd13e2..d89231c 100644 --- a/src/inserter.rs +++ b/src/inserter.rs @@ -5,9 +5,15 @@ use futures::{ Future, }; use serde::Serialize; -use tokio::time::{Duration, Instant}; - -use crate::{error::Result, insert::Insert, row::Row, ticks::Ticks, Client}; +use tokio::time::Duration; + +use crate::{ + error::Result, + insert::Insert, + row::Row, + ticks::{self, Ticks}, + Client, +}; const DEFAULT_MAX_ENTRIES: u64 = 500_000; @@ -154,7 +160,7 @@ where Some( self.ticks .next_at()? - .saturating_duration_since(Instant::now()), + .saturating_duration_since(ticks::Instant::now()), ) } @@ -184,7 +190,7 @@ where self.uncommitted_entries = 0; } - let now = Instant::now(); + let now = ticks::Instant::now(); Ok(if self.is_threshold_reached(now) { let quantities = mem::replace(&mut self.committed, Quantities::ZERO); @@ -207,7 +213,7 @@ where Ok(self.committed) } - fn is_threshold_reached(&self, now: Instant) -> bool { + fn is_threshold_reached(&self, now: ticks::Instant) -> bool { self.committed.entries >= self.max_entries || self.ticks.next_at().map_or(false, |next_at| now >= next_at) } diff --git a/src/ticks.rs b/src/ticks.rs index 37d2157..e8756ae 100644 --- a/src/ticks.rs +++ b/src/ticks.rs @@ -1,7 +1,18 @@ -use tokio::time::{Duration, Instant}; +use tokio::time::Duration; const PERIOD_THRESHOLD: Duration = Duration::from_secs(365 * 24 * 3600); +// === Instant === + +// More efficient `Instant` based on TSC. +#[cfg(all(feature = "quanta", not(feature = "test-util")))] +pub(crate) type Instant = quanta::Instant; + +#[cfg(any(not(feature = "quanta"), feature = "test-util"))] +pub(crate) type Instant = tokio::time::Instant; + +// === Ticks === + pub(crate) struct Ticks { period: Duration, max_bias: f64, @@ -62,9 +73,9 @@ impl Ticks { } } -#[tokio::test] +#[cfg(feature = "test-util")] // only with `tokio::time::Instant` +#[tokio::test(start_paused = true)] async fn it_works() { - tokio::time::pause(); let origin = Instant::now(); // No bias. @@ -101,9 +112,9 @@ async fn it_works() { assert_eq!(ticks.next_at().unwrap() - origin, Duration::from_secs(31)); } -#[tokio::test] +#[cfg(feature = "test-util")] // only with `tokio::time::Instant` +#[tokio::test(start_paused = true)] async fn it_skips_extra_ticks() { - tokio::time::pause(); let origin = Instant::now(); let mut ticks = Ticks::default();