From 23c0348bf053e1da407ea2a9ebdd3598fb01a19b Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 21 Jul 2023 12:49:38 -0500 Subject: [PATCH] Add clickbench query runner to benchmarks, update docs --- benchmarks/README.md | 92 +++++++++----- benchmarks/bench.sh | 13 +- benchmarks/queries/clickbench/README.txt | 1 + benchmarks/queries/clickbench/queries.sql | 43 +++++++ benchmarks/src/bin/dfbench.rs | 4 +- benchmarks/src/bin/tpch.rs | 5 +- benchmarks/src/clickbench.rs | 144 ++++++++++++++++++++++ benchmarks/src/lib.rs | 25 +--- benchmarks/src/options.rs | 54 ++++++++ benchmarks/src/tpch/convert.rs | 2 +- benchmarks/src/tpch/run.rs | 49 +++++--- 11 files changed, 353 insertions(+), 79 deletions(-) create mode 100644 benchmarks/queries/clickbench/README.txt create mode 100644 benchmarks/queries/clickbench/queries.sql create mode 100644 benchmarks/src/clickbench.rs create mode 100644 benchmarks/src/options.rs diff --git a/benchmarks/README.md b/benchmarks/README.md index cf8a20a823f5..b5c767cc5cc6 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -20,11 +20,15 @@ # DataFusion Benchmarks This crate contains benchmarks based on popular public data sets and -open source benchmark suites, making it easy to run more realistic -benchmarks to help with performance and scalability testing of DataFusion. +open source benchmark suites, to help with performance and scalability +testing of DataFusion. + # Benchmarks Against Other Engines +This crate is used to benchmark changes to DataFusion itself, rather +than benchmarking against another engine. + DataFusion is included in the benchmark setups for several popular benchmarks that compare performance with other engines. For example: @@ -38,9 +42,8 @@ benchmarks that compare performance with other engines. For example: ## Running Benchmarks -The easiest way to run benchmarks from DataFusion source checkouts is -to use the [bench.sh](bench.sh) script. Usage instructions can be -found with: +The easiest way to run benchmarks is the [bench.sh](bench.sh) +script. Usage instructions can be found with: ```shell # show usage @@ -49,17 +52,23 @@ found with: ## Generating Data -You can create data for all these benchmarks using the [bench.sh](bench.sh) script: +You can create / download the data for these benchmarks using the [bench.sh](bench.sh) script: + +Create / download all datasets ```shell ./bench.sh data ``` -Data is generated in the `data` subdirectory and will not be checked -in because this directory has been added to the `.gitignore` file. +Create / download a specific dataset (TPCH) + +```shell +./bench.sh data tpch +``` +Data is placed in the `data` subdirectory. -## Example to compare peformance on main to a branch +## Comparing peformance on main to a branch ```shell git checkout main @@ -143,40 +152,28 @@ Benchmark tpch_mem.json ``` -# Benchmark Descriptions: - -## `tpch` Benchmark derived from TPC-H - -These benchmarks are derived from the [TPC-H][1] benchmark. And we use this repo as the source of tpch-gen and answers: -https://github.com/databricks/tpch-dbgen.git, based on [2.17.1](https://www.tpc.org/tpc_documents_current_versions/pdf/tpc-h_v2.17.1.pdf) version of TPC-H. +### Running Benchmarks Manually - -### Running the DataFusion Benchmarks Manually - -The benchmark can then be run (assuming the data created from `dbgen` is in `./data`) with a command such as: +Assuming the data created in the `data` directory, the `tpch` benchmark can be run like ```bash -cargo run --release --bin tpch -- benchmark datafusion --iterations 3 --path ./data --format tbl --query 1 --batch-size 4096 +cargo run --release --bin dfbench -- tpch --iterations 3 --path ./data --format tbl --query 1 --batch-size 4096 ``` -If you omit `--query=` argument, then all benchmarks will be run one by one (from query 1 to query 22). +If you omit `--query=` argument, then all 22 queries will be run ```bash -cargo run --release --bin tpch -- benchmark datafusion --iterations 1 --path ./data --format tbl --batch-size 4096 +cargo run --release --bin dfbench -- tpch --iterations 1 --path ./data --format tbl --batch-size 4096 ``` +### Different features + You can enable the features `simd` (to use SIMD instructions, `cargo nightly` is required.) and/or `mimalloc` or `snmalloc` (to use either the mimalloc or snmalloc allocator) as features by passing them in as `--features`: ``` cargo run --release --features "simd mimalloc" --bin tpch -- benchmark datafusion --iterations 3 --path ./data --format tbl --query 1 --batch-size 4096 ``` -If you want to disable collection of statistics (and thus cost based optimizers), you can pass `--disable-statistics` flag. - -```bash -cargo run --release --bin tpch -- benchmark datafusion --iterations 3 --path /mnt/tpch-parquet --format parquet --query 17 --disable-statistics -``` - The benchmark program also supports CSV and Parquet input file formats and a utility is provided to convert from `tbl` (generated by the `dbgen` utility) to CSV and Parquet. @@ -188,9 +185,10 @@ Or if you want to verify and run all the queries in the benchmark, you can just ### Comparing results between runs -Any `tpch` execution with `-o ` argument will produce a summary file right under the `` -directory. It is a JSON serialized form of all the runs that happened as well as the runtime metadata -(number of cores, DataFusion version, etc.). +Any `dfbench` execution with `-o ` argument will produce a +summary JSON in the specified directory. This file contains a +serialized form of all the runs that happened and runtime +metadata (number of cores, DataFusion version, etc.). ```shell $ git checkout main @@ -253,6 +251,38 @@ Query 1 iteration 0 took 1956.1 ms Query 1 avg time: 1956.11 ms ``` +# Benchmark Descriptions + +## `dfbench` + +The `dfbench` program contains subcommands to run various benchmarks. + +Full help can be found in the relevant sub command. For example to get help for tpch, +run `cargo run --bin dfbench tpch --help` + +```shell +cargo run --bin dfbench --help + +cargo run --bin dfbench -- --help + Finished dev [unoptimized + debuginfo] target(s) in 0.29s + Running `/Users/alamb/Software/target-df2/debug/dfbench --help` +datafusion-benchmarks 27.0.0 +benchmark command + +USAGE: + dfbench + +SUBCOMMANDS: + clickbench Run the clickbench benchmark + help Prints this message or the help of the given subcommand(s) + tpch Run the tpch benchmark. + tpch-convert Convert tpch .slt files to .parquet or .csv files + +``` + + + + ## NYC Taxi Benchmark These benchmarks are based on the [New York Taxi and Limousine Commission][2] data set. diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index f71094a42549..392935d937e7 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -35,7 +35,8 @@ BENCHMARK=all DATAFUSION_DIR=${DATAFUSION_DIR:-$SCRIPT_DIR/..} DATA_DIR=${DATA_DIR:-$SCRIPT_DIR/data} #CARGO_COMMAND=${CARGO_COMMAND:-"cargo run --release"} -CARGO_COMMAND=${CARGO_COMMAND:-"cargo run --profile release-nonlto"} # TEMP: for faster iterations +#CARGO_COMMAND=${CARGO_COMMAND:-"cargo run --profile release-nonlto"} # TEMP: for faster iterations +CARGO_COMMAND=${CARGO_COMMAND:-"cargo run "} # TEMP: for faster iterations usage() { echo " @@ -386,12 +387,18 @@ data_clickbench_partitioned() { # Runs the clickbench benchmark with a single large parquet file run_clickbench_1() { - echo "NOTICE: ClickBench (1 parquet file) is not yet supported" + RESULTS_FILE="${RESULTS_DIR}/clickbench_1.json" + echo "RESULTS_FILE: ${RESULTS_FILE}" + echo "Running clickbench (1 file) benchmark..." + $CARGO_COMMAND --bin dfbench -- clickbench --iterations 10 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE} } # Runs the clickbench benchmark with a single large parquet file run_clickbench_partitioned() { - echo "NOTICE: ClickBench (1 parquet file) is not yet supported" + RESULTS_FILE="${RESULTS_DIR}/clickbench_1.json" + echo "RESULTS_FILE: ${RESULTS_FILE}" + echo "Running clickbench (partitioned, 100 files) benchmark..." + $CARGO_COMMAND --bin dfbench -- clickbench --iterations 10 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o ${RESULTS_FILE} } compare_benchmarks() { diff --git a/benchmarks/queries/clickbench/README.txt b/benchmarks/queries/clickbench/README.txt new file mode 100644 index 000000000000..b46900956e54 --- /dev/null +++ b/benchmarks/queries/clickbench/README.txt @@ -0,0 +1 @@ +Downloaded from https://github.com/ClickHouse/ClickBench/blob/main/datafusion/queries.sql diff --git a/benchmarks/queries/clickbench/queries.sql b/benchmarks/queries/clickbench/queries.sql new file mode 100644 index 000000000000..52e72e02e1e0 --- /dev/null +++ b/benchmarks/queries/clickbench/queries.sql @@ -0,0 +1,43 @@ +SELECT COUNT(*) FROM hits; +SELECT COUNT(*) FROM hits WHERE "AdvEngineID" <> 0; +SELECT SUM("AdvEngineID"), COUNT(*), AVG("ResolutionWidth") FROM hits; +SELECT AVG("UserID") FROM hits; +SELECT COUNT(DISTINCT "UserID") FROM hits; +SELECT COUNT(DISTINCT "SearchPhrase") FROM hits; +SELECT MIN("EventDate"::INT::DATE), MAX("EventDate"::INT::DATE) FROM hits; +SELECT "AdvEngineID", COUNT(*) FROM hits WHERE "AdvEngineID" <> 0 GROUP BY "AdvEngineID" ORDER BY COUNT(*) DESC; +SELECT "RegionID", COUNT(DISTINCT "UserID") AS u FROM hits GROUP BY "RegionID" ORDER BY u DESC LIMIT 10; +SELECT "RegionID", SUM("AdvEngineID"), COUNT(*) AS c, AVG("ResolutionWidth"), COUNT(DISTINCT "UserID") FROM hits GROUP BY "RegionID" ORDER BY c DESC LIMIT 10; +SELECT "MobilePhoneModel", COUNT(DISTINCT "UserID") AS u FROM hits WHERE "MobilePhoneModel" <> '' GROUP BY "MobilePhoneModel" ORDER BY u DESC LIMIT 10; +SELECT "MobilePhone", "MobilePhoneModel", COUNT(DISTINCT "UserID") AS u FROM hits WHERE "MobilePhoneModel" <> '' GROUP BY "MobilePhone", "MobilePhoneModel" ORDER BY u DESC LIMIT 10; +SELECT "SearchPhrase", COUNT(*) AS c FROM hits WHERE "SearchPhrase" <> '' GROUP BY "SearchPhrase" ORDER BY c DESC LIMIT 10; +SELECT "SearchPhrase", COUNT(DISTINCT "UserID") AS u FROM hits WHERE "SearchPhrase" <> '' GROUP BY "SearchPhrase" ORDER BY u DESC LIMIT 10; +SELECT "SearchEngineID", "SearchPhrase", COUNT(*) AS c FROM hits WHERE "SearchPhrase" <> '' GROUP BY "SearchEngineID", "SearchPhrase" ORDER BY c DESC LIMIT 10; +SELECT "UserID", COUNT(*) FROM hits GROUP BY "UserID" ORDER BY COUNT(*) DESC LIMIT 10; +SELECT "UserID", "SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID", "SearchPhrase" ORDER BY COUNT(*) DESC LIMIT 10; +SELECT "UserID", "SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID", "SearchPhrase" LIMIT 10; +SELECT "UserID", extract(minute FROM to_timestamp_seconds("EventTime")) AS m, "SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID", m, "SearchPhrase" ORDER BY COUNT(*) DESC LIMIT 10; +SELECT "UserID" FROM hits WHERE "UserID" = 435090932899640449; +SELECT COUNT(*) FROM hits WHERE "URL" LIKE '%google%'; +SELECT "SearchPhrase", MIN("URL"), COUNT(*) AS c FROM hits WHERE "URL" LIKE '%google%' AND "SearchPhrase" <> '' GROUP BY "SearchPhrase" ORDER BY c DESC LIMIT 10; +SELECT "SearchPhrase", MIN("URL"), MIN("Title"), COUNT(*) AS c, COUNT(DISTINCT "UserID") FROM hits WHERE "Title" LIKE '%Google%' AND "URL" NOT LIKE '%.google.%' AND "SearchPhrase" <> '' GROUP BY "SearchPhrase" ORDER BY c DESC LIMIT 10; +SELECT * FROM hits WHERE "URL" LIKE '%google%' ORDER BY to_timestamp_seconds("EventTime") LIMIT 10; +SELECT "SearchPhrase" FROM hits WHERE "SearchPhrase" <> '' ORDER BY to_timestamp_seconds("EventTime") LIMIT 10; +SELECT "SearchPhrase" FROM hits WHERE "SearchPhrase" <> '' ORDER BY "SearchPhrase" LIMIT 10; +SELECT "SearchPhrase" FROM hits WHERE "SearchPhrase" <> '' ORDER BY to_timestamp_seconds("EventTime"), "SearchPhrase" LIMIT 10; +SELECT "CounterID", AVG(length("URL")) AS l, COUNT(*) AS c FROM hits WHERE "URL" <> '' GROUP BY "CounterID" HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25; +SELECT REGEXP_REPLACE("Referer", '^https?://(?:www\.)?([^/]+)/.*$', '\1') AS k, AVG(length("Referer")) AS l, COUNT(*) AS c, MIN("Referer") FROM hits WHERE "Referer" <> '' GROUP BY k HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25; +SELECT SUM("ResolutionWidth"), SUM("ResolutionWidth" + 1), SUM("ResolutionWidth" + 2), SUM("ResolutionWidth" + 3), SUM("ResolutionWidth" + 4), SUM("ResolutionWidth" + 5), SUM("ResolutionWidth" + 6), SUM("ResolutionWidth" + 7), SUM("ResolutionWidth" + 8), SUM("ResolutionWidth" + 9), SUM("ResolutionWidth" + 10), SUM("ResolutionWidth" + 11), SUM("ResolutionWidth" + 12), SUM("ResolutionWidth" + 13), SUM("ResolutionWidth" + 14), SUM("ResolutionWidth" + 15), SUM("ResolutionWidth" + 16), SUM("ResolutionWidth" + 17), SUM("ResolutionWidth" + 18), SUM("ResolutionWidth" + 19), SUM("ResolutionWidth" + 20), SUM("ResolutionWidth" + 21), SUM("ResolutionWidth" + 22), SUM("ResolutionWidth" + 23), SUM("ResolutionWidth" + 24), SUM("ResolutionWidth" + 25), SUM("ResolutionWidth" + 26), SUM("ResolutionWidth" + 27), SUM("ResolutionWidth" + 28), SUM("ResolutionWidth" + 29), SUM("ResolutionWidth" + 30), SUM("ResolutionWidth" + 31), SUM("ResolutionWidth" + 32), SUM("ResolutionWidth" + 33), SUM("ResolutionWidth" + 34), SUM("ResolutionWidth" + 35), SUM("ResolutionWidth" + 36), SUM("ResolutionWidth" + 37), SUM("ResolutionWidth" + 38), SUM("ResolutionWidth" + 39), SUM("ResolutionWidth" + 40), SUM("ResolutionWidth" + 41), SUM("ResolutionWidth" + 42), SUM("ResolutionWidth" + 43), SUM("ResolutionWidth" + 44), SUM("ResolutionWidth" + 45), SUM("ResolutionWidth" + 46), SUM("ResolutionWidth" + 47), SUM("ResolutionWidth" + 48), SUM("ResolutionWidth" + 49), SUM("ResolutionWidth" + 50), SUM("ResolutionWidth" + 51), SUM("ResolutionWidth" + 52), SUM("ResolutionWidth" + 53), SUM("ResolutionWidth" + 54), SUM("ResolutionWidth" + 55), SUM("ResolutionWidth" + 56), SUM("ResolutionWidth" + 57), SUM("ResolutionWidth" + 58), SUM("ResolutionWidth" + 59), SUM("ResolutionWidth" + 60), SUM("ResolutionWidth" + 61), SUM("ResolutionWidth" + 62), SUM("ResolutionWidth" + 63), SUM("ResolutionWidth" + 64), SUM("ResolutionWidth" + 65), SUM("ResolutionWidth" + 66), SUM("ResolutionWidth" + 67), SUM("ResolutionWidth" + 68), SUM("ResolutionWidth" + 69), SUM("ResolutionWidth" + 70), SUM("ResolutionWidth" + 71), SUM("ResolutionWidth" + 72), SUM("ResolutionWidth" + 73), SUM("ResolutionWidth" + 74), SUM("ResolutionWidth" + 75), SUM("ResolutionWidth" + 76), SUM("ResolutionWidth" + 77), SUM("ResolutionWidth" + 78), SUM("ResolutionWidth" + 79), SUM("ResolutionWidth" + 80), SUM("ResolutionWidth" + 81), SUM("ResolutionWidth" + 82), SUM("ResolutionWidth" + 83), SUM("ResolutionWidth" + 84), SUM("ResolutionWidth" + 85), SUM("ResolutionWidth" + 86), SUM("ResolutionWidth" + 87), SUM("ResolutionWidth" + 88), SUM("ResolutionWidth" + 89) FROM hits; +SELECT "SearchEngineID", "ClientIP", COUNT(*) AS c, SUM("IsRefresh"), AVG("ResolutionWidth") FROM hits WHERE "SearchPhrase" <> '' GROUP BY "SearchEngineID", "ClientIP" ORDER BY c DESC LIMIT 10; +SELECT "WatchID", "ClientIP", COUNT(*) AS c, SUM("IsRefresh"), AVG("ResolutionWidth") FROM hits WHERE "SearchPhrase" <> '' GROUP BY "WatchID", "ClientIP" ORDER BY c DESC LIMIT 10; +SELECT "WatchID", "ClientIP", COUNT(*) AS c, SUM("IsRefresh"), AVG("ResolutionWidth") FROM hits GROUP BY "WatchID", "ClientIP" ORDER BY c DESC LIMIT 10; +SELECT "URL", COUNT(*) AS c FROM hits GROUP BY "URL" ORDER BY c DESC LIMIT 10; +SELECT 1, "URL", COUNT(*) AS c FROM hits GROUP BY 1, "URL" ORDER BY c DESC LIMIT 10; +SELECT "ClientIP", "ClientIP" - 1, "ClientIP" - 2, "ClientIP" - 3, COUNT(*) AS c FROM hits GROUP BY "ClientIP", "ClientIP" - 1, "ClientIP" - 2, "ClientIP" - 3 ORDER BY c DESC LIMIT 10; +SELECT "URL", COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <= '2013-07-31' AND "DontCountHits" = 0 AND "IsRefresh" = 0 AND "URL" <> '' GROUP BY "URL" ORDER BY PageViews DESC LIMIT 10; +SELECT "Title", COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <= '2013-07-31' AND "DontCountHits" = 0 AND "IsRefresh" = 0 AND "Title" <> '' GROUP BY "Title" ORDER BY PageViews DESC LIMIT 10; +SELECT "URL", COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <= '2013-07-31' AND "IsRefresh" = 0 AND "IsLink" <> 0 AND "IsDownload" = 0 GROUP BY "URL" ORDER BY PageViews DESC LIMIT 10 OFFSET 1000; +SELECT "TraficSourceID", "SearchEngineID", "AdvEngineID", CASE WHEN ("SearchEngineID" = 0 AND "AdvEngineID" = 0) THEN "Referer" ELSE '' END AS Src, "URL" AS Dst, COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <= '2013-07-31' AND "IsRefresh" = 0 GROUP BY "TraficSourceID", "SearchEngineID", "AdvEngineID", Src, Dst ORDER BY PageViews DESC LIMIT 10 OFFSET 1000; +SELECT "URLHash", "EventDate"::INT::DATE, COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <= '2013-07-31' AND "IsRefresh" = 0 AND "TraficSourceID" IN (-1, 6) AND "RefererHash" = 3594120000172545465 GROUP BY "URLHash", "EventDate"::INT::DATE ORDER BY PageViews DESC LIMIT 10 OFFSET 100; +SELECT "WindowClientWidth", "WindowClientHeight", COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <= '2013-07-31' AND "IsRefresh" = 0 AND "DontCountHits" = 0 AND "URLHash" = 2868770270353813622 GROUP BY "WindowClientWidth", "WindowClientHeight" ORDER BY PageViews DESC LIMIT 10 OFFSET 10000; +SELECT DATE_TRUNC('minute', to_timestamp_seconds("EventTime")) AS M, COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-14' AND "EventDate"::INT::DATE <= '2013-07-15' AND "IsRefresh" = 0 AND "DontCountHits" = 0 GROUP BY DATE_TRUNC('minute', to_timestamp_seconds("EventTime")) ORDER BY DATE_TRUNC('minute', M) LIMIT 10 OFFSET 1000; diff --git a/benchmarks/src/bin/dfbench.rs b/benchmarks/src/bin/dfbench.rs index f4ba8bc975d2..d5a17a2ab54e 100644 --- a/benchmarks/src/bin/dfbench.rs +++ b/benchmarks/src/bin/dfbench.rs @@ -28,13 +28,14 @@ static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc; #[global_allocator] static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc; -use datafusion_benchmarks::tpch; +use datafusion_benchmarks::{clickbench, tpch}; #[derive(Debug, StructOpt)] #[structopt(about = "benchmark command")] enum Options { Tpch(tpch::RunOpt), TpchConvert(tpch::ConvertOpt), + Clickbench(clickbench::RunOpt), } // Main benchmark runner entrypoint @@ -45,5 +46,6 @@ pub async fn main() -> Result<()> { match Options::from_args() { Options::Tpch(opt) => opt.run().await, Options::TpchConvert(opt) => opt.run().await, + Options::Clickbench(opt) => opt.run().await, } } diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 757136d231ae..95480935700d 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -43,9 +43,10 @@ enum TpchOpt { Convert(tpch::ConvertOpt), } -/// 'tpch' entry point, with tortured command line arguments +/// 'tpch' entry point, with tortured command line arguments. Please +/// use `dbbench` instead. /// -/// This is kept to be backwards compatible with the benchmark names prior to +/// Note: this is kept to be backwards compatible with the benchmark names prior to /// #[tokio::main] async fn main() -> Result<()> { diff --git a/benchmarks/src/clickbench.rs b/benchmarks/src/clickbench.rs new file mode 100644 index 000000000000..416f045b245e --- /dev/null +++ b/benchmarks/src/clickbench.rs @@ -0,0 +1,144 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{path::PathBuf, time::Instant}; + +use datafusion::{ + error::{DataFusionError, Result}, + prelude::SessionContext, +}; +use structopt::StructOpt; + +use crate::{BenchmarkRun, CommonOpt}; + +/// Run the clickbench benchmark +/// +/// The ClickBench[1] benchmarks are widely cited in the industry and +/// focus on grouping / aggregation / filtering. This runner uses the +/// scripts and queries from [2]. +/// +/// [1]: https://github.com/ClickHouse/ClickBench +/// [2]: https://github.com/ClickHouse/ClickBench/tree/main/datafusion +#[derive(Debug, StructOpt, Clone)] +#[structopt(verbatim_doc_comment)] +pub struct RunOpt { + /// Query number. If not specified, runs all queries + #[structopt(short, long)] + query: Option, + + /// Common options + #[structopt(flatten)] + common: CommonOpt, + + /// Path to hits.parquet (single file) or `hits_partitioned` + /// (partitioned, 100 files) + #[structopt( + parse(from_os_str), + short = "p", + long = "path", + default_value = "benchmarks/data/hits.parquet" + )] + path: PathBuf, + + /// Path to queries.sql (single file) + #[structopt( + parse(from_os_str), + short = "r", + long = "queries-path", + default_value = "benchmarks/queries/clickbench/queries.sql" + )] + queries_path: PathBuf, + + /// If present, write results json here + #[structopt(parse(from_os_str), short = "o", long = "output")] + output_path: Option, +} + +const CLICKBENCH_QUERY_START_ID: usize = 1; +const CLICKBENCH_QUERY_END_ID: usize = 43; + +impl RunOpt { + pub async fn run(self) -> Result<()> { + println!("Running benchmarks with the following options: {self:?}"); + let query_range = match self.query { + Some(query_id) => query_id..=query_id, + None => CLICKBENCH_QUERY_START_ID..=CLICKBENCH_QUERY_END_ID, + }; + + let config = self.common.config(); + let ctx = SessionContext::with_config(config); + self.register_hits(&ctx).await?; + + let iterations = self.common.iterations(); + let mut benchmark_run = BenchmarkRun::new(); + for query_id in query_range { + benchmark_run.start_new_case(&format!("Query {query_id}")); + let sql = self.get_query(query_id)?; + println!("Q{query_id}: {sql}"); + + for i in 0..iterations { + let start = Instant::now(); + let results = ctx.sql(&sql).await?.collect().await?; + let elapsed = start.elapsed(); + let ms = elapsed.as_secs_f64() * 1000.0; + let row_count: usize = results.iter().map(|b| b.num_rows()).sum(); + println!( + "Query {query_id} iteration {i} took {ms:.1} ms and returned {row_count} rows" + ); + benchmark_run.write_iter(elapsed, row_count); + } + } + benchmark_run.maybe_write_json(self.output_path.as_ref())?; + Ok(()) + } + + /// Registrs the `hits.parquet` as a table named `hits` + async fn register_hits(&self, ctx: &SessionContext) -> Result<()> { + let options = Default::default(); + let path = self.path.as_os_str().to_str().unwrap(); + ctx.register_parquet("hits", path, options) + .await + .map_err(|e| { + DataFusionError::Context( + format!("Registering 'hits' as {path}"), + Box::new(e), + ) + }) + } + + /// Returns the text of query `query_id` + fn get_query(&self, query_id: usize) -> Result { + if query_id == 0 || query_id > 43 { + return Err(DataFusionError::Execution(format!( + "Invalid query id {query_id}. Must be between {CLICKBENCH_QUERY_START_ID} and {CLICKBENCH_QUERY_END_ID}" + ))); + } + + let path = self.queries_path.as_path(); + + // ClickBench has all queries in a single file identified by line number + let all_queries = std::fs::read_to_string(path).map_err(|e| { + DataFusionError::Execution(format!("Could not open {path:?}: {e}")) + })?; + let all_queries: Vec<_> = all_queries.lines().collect(); + + Ok(all_queries + .get(query_id - 1) + .map(|s| s.to_string()) + .unwrap()) + } +} diff --git a/benchmarks/src/lib.rs b/benchmarks/src/lib.rs index 9d5530d31f68..8cab15111521 100644 --- a/benchmarks/src/lib.rs +++ b/benchmarks/src/lib.rs @@ -16,27 +16,10 @@ // under the License. //! DataFusion benchmark runner -use datafusion::error::Result; -use structopt::StructOpt; - -pub mod run; +pub mod clickbench; pub mod tpch; +mod options; +mod run; +pub use options::CommonOpt; pub use run::{BenchQuery, BenchmarkRun}; - -#[derive(Debug, StructOpt)] -#[structopt(about = "benchmark command")] -enum Options { - Tpch(tpch::RunOpt), - TpchConvert(tpch::ConvertOpt), -} - -// Main benchmark runner entrypoint -pub async fn main() -> Result<()> { - env_logger::init(); - - match Options::from_args() { - Options::Tpch(opt) => opt.run().await, - Options::TpchConvert(opt) => opt.run().await, - } -} diff --git a/benchmarks/src/options.rs b/benchmarks/src/options.rs new file mode 100644 index 000000000000..f9b07b49d06d --- /dev/null +++ b/benchmarks/src/options.rs @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion::prelude::SessionConfig; +use structopt::StructOpt; + + +// Common benchmark options (don't use doc comments otherwise this doc +// shows up in help files) +#[derive(Debug, StructOpt, Clone)] +pub struct CommonOpt { + /// Number of iterations of each test run + #[structopt(short = "i", long = "iterations", default_value = "3")] + iterations: usize, + + /// Number of partitions to process in parallel + #[structopt(short = "n", long = "partitions", default_value = "2")] + partitions: usize, + + /// Batch size when reading CSV or Parquet files + #[structopt(short = "s", long = "batch-size", default_value = "8192")] + batch_size: usize, +} + +impl CommonOpt { + /// Return an appropriately configured `SessionConfig` + pub fn config(&self) -> SessionConfig { + SessionConfig::new() + .with_target_partitions(self.partitions) + .with_batch_size(self.batch_size) + } + + pub fn iterations(&self) -> usize { + self.iterations + } + + pub fn partitions(&self) -> usize { + self.partitions + } +} diff --git a/benchmarks/src/tpch/convert.rs b/benchmarks/src/tpch/convert.rs index e076097ad771..24631d10e49f 100644 --- a/benchmarks/src/tpch/convert.rs +++ b/benchmarks/src/tpch/convert.rs @@ -57,7 +57,7 @@ pub struct ConvertOpt { } impl ConvertOpt { - pub async fn run(&self) -> Result<()> { + pub async fn run(self) -> Result<()> { let compression = self.compression()?; let input_path = self.input_path.to_str().unwrap(); diff --git a/benchmarks/src/tpch/run.rs b/benchmarks/src/tpch/run.rs index 6aada30bc712..fb496ddc7c83 100644 --- a/benchmarks/src/tpch/run.rs +++ b/benchmarks/src/tpch/run.rs @@ -16,7 +16,7 @@ // under the License. use super::get_query_sql; -use crate::BenchmarkRun; +use crate::{BenchmarkRun, CommonOpt}; use arrow::record_batch::RecordBatch; use arrow::util::pretty::{self, pretty_format_batches}; use datafusion::datasource::file_format::csv::{CsvFormat, DEFAULT_CSV_EXTENSION}; @@ -42,8 +42,17 @@ use structopt::StructOpt; use super::{get_tbl_tpch_table_schema, get_tpch_table_schema, TPCH_TABLES}; -/// Run the tpch benchmark +/// Run the tpch benchmark. +/// +/// This benchmarks is derived from the [TPC-H][1] version +/// [2.17.1]. The data and answers are generated using `tpch-gen` from +/// [2]. +/// +/// [1]: http://www.tpc.org/tpch/ +/// [2]: https://github.com/databricks/tpch-dbgen.git, +/// [2.17.1]: https://www.tpc.org/tpc_documents_current_versions/pdf/tpc-h_v2.17.1.pdf #[derive(Debug, StructOpt, Clone)] +#[structopt(verbatim_doc_comment)] pub struct RunOpt { /// Query number. If not specified, runs all queries #[structopt(short, long)] @@ -53,17 +62,9 @@ pub struct RunOpt { #[structopt(short, long)] debug: bool, - /// Number of iterations of each test run - #[structopt(short = "i", long = "iterations", default_value = "3")] - iterations: usize, - - /// Number of partitions to process in parallel - #[structopt(short = "n", long = "partitions", default_value = "2")] - partitions: usize, - - /// Batch size when reading CSV or Parquet files - #[structopt(short = "s", long = "batch-size", default_value = "8192")] - batch_size: usize, + /// Common options + #[structopt(flatten)] + common: CommonOpt, /// Path to data files #[structopt(parse(from_os_str), required = true, short = "p", long = "path")] @@ -90,7 +91,7 @@ const TPCH_QUERY_START_ID: usize = 1; const TPCH_QUERY_END_ID: usize = 22; impl RunOpt { - pub async fn run(&self) -> Result<()> { + pub async fn run(self) -> Result<()> { println!("Running benchmarks with the following options: {self:?}"); let query_range = match self.query { Some(query_id) => query_id..=query_id, @@ -110,9 +111,9 @@ impl RunOpt { } async fn benchmark_query(&self, query_id: usize) -> Result> { - let config = SessionConfig::new() - .with_target_partitions(self.partitions) - .with_batch_size(self.batch_size) + let config = self + .common + .config() .with_collect_statistics(!self.disable_statistics); let ctx = SessionContext::with_config(config); @@ -122,7 +123,7 @@ impl RunOpt { let mut millis = vec![]; // run benchmark let mut query_results = vec![]; - for i in 0..self.iterations { + for i in 0..self.iterations() { let start = Instant::now(); let sql = &get_query_sql(query_id)?; @@ -169,7 +170,7 @@ impl RunOpt { println!("Loading table '{table}' into memory"); let start = Instant::now(); let memtable = - MemTable::load(table_provider, Some(self.partitions), &ctx.state()) + MemTable::load(table_provider, Some(self.partitions()), &ctx.state()) .await?; println!( "Loaded table '{}' into memory in {} ms", @@ -231,7 +232,7 @@ impl RunOpt { ) -> Result> { let path = self.path.to_str().unwrap(); let table_format = self.file_format.as_str(); - let target_partitions = self.partitions; + let target_partitions = self.partitions(); // Obtain a snapshot of the SessionState let state = ctx.state(); @@ -283,6 +284,14 @@ impl RunOpt { Ok(Arc::new(ListingTable::try_new(config)?)) } + + fn iterations(&self) -> usize { + self.common.iterations() + } + + fn partitions(&self) -> usize { + self.common.partitions() + } } struct QueryResult {