From 07cd8b713a4ffb1e6bb15eae39ca8d4818c06eba Mon Sep 17 00:00:00 2001 From: lmatz Date: Tue, 11 Apr 2023 20:08:33 +0800 Subject: [PATCH] test: add create and drop sink test for nexmark (#8686) --- ci/scripts/e2e-sink-test.sh | 9 +++++ ci/scripts/e2e-source-test.sh | 10 ++--- ci/workflows/main.yml | 2 +- ci/workflows/pull-request.yml | 2 +- .../nexmark_endless_part1.slt | 0 .../nexmark_endless_part2.slt | 0 .../nexmark_endless_part3.slt | 0 .../nexmark_endless_part4.slt | 0 .../nexmark_endless_part1.slt | 22 +++++++++++ .../nexmark_endless_part2.slt | 22 +++++++++++ .../nexmark_endless_part3.slt | 22 +++++++++++ .../nexmark_endless_part4.slt | 22 +++++++++++ .../nexmark_endless_part5.slt | 22 +++++++++++ .../nexmark_endless_part6.slt | 22 +++++++++++ e2e_test/streaming/nexmark/sinks/q0.slt.part | 5 +++ e2e_test/streaming/nexmark/sinks/q1.slt.part | 10 +++++ e2e_test/streaming/nexmark/sinks/q10.slt.part | 4 ++ .../streaming/nexmark/sinks/q101.slt.part | 16 ++++++++ .../streaming/nexmark/sinks/q102.slt.part | 14 +++++++ .../streaming/nexmark/sinks/q103.slt.part | 13 +++++++ .../streaming/nexmark/sinks/q104.slt.part | 13 +++++++ .../streaming/nexmark/sinks/q105.slt.part | 13 +++++++ .../streaming/nexmark/sinks/q106.slt.part | 20 ++++++++++ e2e_test/streaming/nexmark/sinks/q14.slt.part | 25 +++++++++++++ e2e_test/streaming/nexmark/sinks/q15.slt.part | 19 ++++++++++ e2e_test/streaming/nexmark/sinks/q16.slt.part | 21 +++++++++++ e2e_test/streaming/nexmark/sinks/q17.slt.part | 16 ++++++++ e2e_test/streaming/nexmark/sinks/q18.slt.part | 14 +++++++ e2e_test/streaming/nexmark/sinks/q2.slt.part | 6 +++ e2e_test/streaming/nexmark/sinks/q20.slt.part | 9 +++++ e2e_test/streaming/nexmark/sinks/q21.slt.part | 17 +++++++++ e2e_test/streaming/nexmark/sinks/q22.slt.part | 13 +++++++ e2e_test/streaming/nexmark/sinks/q3.slt.part | 10 +++++ e2e_test/streaming/nexmark/sinks/q4.slt.part | 21 +++++++++++ e2e_test/streaming/nexmark/sinks/q5.slt.part | 36 ++++++++++++++++++ e2e_test/streaming/nexmark/sinks/q7.slt.part | 23 ++++++++++++ e2e_test/streaming/nexmark/sinks/q8.slt.part | 37 +++++++++++++++++++ e2e_test/streaming/nexmark/sinks/q9.slt.part | 14 +++++++ 38 files changed, 535 insertions(+), 9 deletions(-) rename e2e_test/source/{nexmark_endless => nexmark_endless_mvs}/nexmark_endless_part1.slt (100%) rename e2e_test/source/{nexmark_endless => nexmark_endless_mvs}/nexmark_endless_part2.slt (100%) rename e2e_test/source/{nexmark_endless => nexmark_endless_mvs}/nexmark_endless_part3.slt (100%) rename e2e_test/source/{nexmark_endless => nexmark_endless_mvs}/nexmark_endless_part4.slt (100%) create mode 100644 e2e_test/source/nexmark_endless_sinks/nexmark_endless_part1.slt create mode 100644 e2e_test/source/nexmark_endless_sinks/nexmark_endless_part2.slt create mode 100644 e2e_test/source/nexmark_endless_sinks/nexmark_endless_part3.slt create mode 100644 e2e_test/source/nexmark_endless_sinks/nexmark_endless_part4.slt create mode 100644 e2e_test/source/nexmark_endless_sinks/nexmark_endless_part5.slt create mode 100644 e2e_test/source/nexmark_endless_sinks/nexmark_endless_part6.slt create mode 100644 e2e_test/streaming/nexmark/sinks/q0.slt.part create mode 100644 e2e_test/streaming/nexmark/sinks/q1.slt.part create mode 100644 e2e_test/streaming/nexmark/sinks/q10.slt.part create mode 100644 e2e_test/streaming/nexmark/sinks/q101.slt.part create mode 100644 e2e_test/streaming/nexmark/sinks/q102.slt.part create mode 100644 e2e_test/streaming/nexmark/sinks/q103.slt.part create mode 100644 e2e_test/streaming/nexmark/sinks/q104.slt.part create mode 100644 e2e_test/streaming/nexmark/sinks/q105.slt.part create mode 100644 e2e_test/streaming/nexmark/sinks/q106.slt.part create mode 100644 e2e_test/streaming/nexmark/sinks/q14.slt.part create mode 100644 e2e_test/streaming/nexmark/sinks/q15.slt.part create mode 100644 e2e_test/streaming/nexmark/sinks/q16.slt.part create mode 100644 e2e_test/streaming/nexmark/sinks/q17.slt.part create mode 100644 e2e_test/streaming/nexmark/sinks/q18.slt.part create mode 100644 e2e_test/streaming/nexmark/sinks/q2.slt.part create mode 100644 e2e_test/streaming/nexmark/sinks/q20.slt.part create mode 100644 e2e_test/streaming/nexmark/sinks/q21.slt.part create mode 100644 e2e_test/streaming/nexmark/sinks/q22.slt.part create mode 100644 e2e_test/streaming/nexmark/sinks/q3.slt.part create mode 100644 e2e_test/streaming/nexmark/sinks/q4.slt.part create mode 100644 e2e_test/streaming/nexmark/sinks/q5.slt.part create mode 100644 e2e_test/streaming/nexmark/sinks/q7.slt.part create mode 100644 e2e_test/streaming/nexmark/sinks/q8.slt.part create mode 100644 e2e_test/streaming/nexmark/sinks/q9.slt.part diff --git a/ci/scripts/e2e-sink-test.sh b/ci/scripts/e2e-sink-test.sh index 056207ff0d97..651ef7acf45a 100755 --- a/ci/scripts/e2e-sink-test.sh +++ b/ci/scripts/e2e-sink-test.sh @@ -103,3 +103,12 @@ fi echo "--- Kill cluster" cargo make ci-kill pkill -f connector-node + +echo "--- e2e, ci-1cn-1fe, nexmark endless" +RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ +cargo make ci-start ci-1cn-1fe +sqllogictest -p 4566 -d dev './e2e_test/source/nexmark_endless_mvs/*.slt' +sqllogictest -p 4566 -d dev './e2e_test/source/nexmark_endless_sinks/*.slt' + +echo "--- Kill cluster" +cargo make ci-kill \ No newline at end of file diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index 6470f92d2d7f..63c24535febb 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -54,6 +54,7 @@ node_port=50051 node_timeout=10 echo "--- starting risingwave cluster with connector node" +RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ cargo make ci-start ci-1cn-1fe-with-recovery ./connector-node/start-service.sh -p $node_port > .risingwave/log/connector-node.log 2>&1 & @@ -93,6 +94,7 @@ mysql --host=mysql --port=3306 -u root -p123456 < ./e2e_test/source/cdc/mysql_cd psql -h db -U postgres -d cdc_test < ./e2e_test/source/cdc/postgres_cdc_insert.sql # start cluster w/o clean-data +RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ cargo make dev ci-1cn-1fe-with-recovery echo "wait for recovery finish" sleep 20 @@ -104,14 +106,8 @@ echo "--- Kill cluster" cargo make ci-kill pkill -f connector-node -echo "--- e2e, ci-1cn-1fe, nexmark endless" -cargo make ci-start ci-1cn-1fe -sqllogictest -p 4566 -d dev './e2e_test/source/nexmark_endless/*.slt' - -echo "--- Kill cluster" -cargo make ci-kill - echo "--- e2e, ci-kafka-plus-pubsub, kafka and pubsub source" +RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ cargo make ci-start ci-kafka-plus-pubsub ./scripts/source/prepare_ci_kafka.sh cargo run --bin prepare_ci_pubsub diff --git a/ci/workflows/main.yml b/ci/workflows/main.yml index 8dad1a72eb59..e262fbdcf2e7 100644 --- a/ci/workflows/main.yml +++ b/ci/workflows/main.yml @@ -265,7 +265,7 @@ steps: config: ci/docker-compose.yml mount-buildkite-agent: true - ./ci/plugins/upload-failure-logs - timeout_in_minutes: 5 + timeout_in_minutes: 18 - label: "end-to-end iceberg sink test (release mode)" command: "ci/scripts/e2e-iceberg-sink-test.sh -p ci-release" diff --git a/ci/workflows/pull-request.yml b/ci/workflows/pull-request.yml index 7f0dbce9b15f..6d52886f75b0 100644 --- a/ci/workflows/pull-request.yml +++ b/ci/workflows/pull-request.yml @@ -154,7 +154,7 @@ steps: config: ci/docker-compose.yml mount-buildkite-agent: true - ./ci/plugins/upload-failure-logs - timeout_in_minutes: 5 + timeout_in_minutes: 18 - label: "end-to-end iceberg sink test" command: "ci/scripts/e2e-iceberg-sink-test.sh -p ci-dev" diff --git a/e2e_test/source/nexmark_endless/nexmark_endless_part1.slt b/e2e_test/source/nexmark_endless_mvs/nexmark_endless_part1.slt similarity index 100% rename from e2e_test/source/nexmark_endless/nexmark_endless_part1.slt rename to e2e_test/source/nexmark_endless_mvs/nexmark_endless_part1.slt diff --git a/e2e_test/source/nexmark_endless/nexmark_endless_part2.slt b/e2e_test/source/nexmark_endless_mvs/nexmark_endless_part2.slt similarity index 100% rename from e2e_test/source/nexmark_endless/nexmark_endless_part2.slt rename to e2e_test/source/nexmark_endless_mvs/nexmark_endless_part2.slt diff --git a/e2e_test/source/nexmark_endless/nexmark_endless_part3.slt b/e2e_test/source/nexmark_endless_mvs/nexmark_endless_part3.slt similarity index 100% rename from e2e_test/source/nexmark_endless/nexmark_endless_part3.slt rename to e2e_test/source/nexmark_endless_mvs/nexmark_endless_part3.slt diff --git a/e2e_test/source/nexmark_endless/nexmark_endless_part4.slt b/e2e_test/source/nexmark_endless_mvs/nexmark_endless_part4.slt similarity index 100% rename from e2e_test/source/nexmark_endless/nexmark_endless_part4.slt rename to e2e_test/source/nexmark_endless_mvs/nexmark_endless_part4.slt diff --git a/e2e_test/source/nexmark_endless_sinks/nexmark_endless_part1.slt b/e2e_test/source/nexmark_endless_sinks/nexmark_endless_part1.slt new file mode 100644 index 000000000000..34327d62e307 --- /dev/null +++ b/e2e_test/source/nexmark_endless_sinks/nexmark_endless_part1.slt @@ -0,0 +1,22 @@ +include ../../nexmark/create_sources.slt.part + +include ../../streaming/nexmark/sinks/q0.slt.part +include ../../streaming/nexmark/sinks/q1.slt.part +include ../../streaming/nexmark/sinks/q2.slt.part +include ../../streaming/nexmark/sinks/q3.slt.part + +sleep 20s + +statement ok +drop sink nexmark_q0; + +statement ok +drop sink nexmark_q1; + +statement ok +drop sink nexmark_q2; + +statement ok +drop sink nexmark_q3; + +include ../../nexmark/drop_sources.slt.part diff --git a/e2e_test/source/nexmark_endless_sinks/nexmark_endless_part2.slt b/e2e_test/source/nexmark_endless_sinks/nexmark_endless_part2.slt new file mode 100644 index 000000000000..333042033704 --- /dev/null +++ b/e2e_test/source/nexmark_endless_sinks/nexmark_endless_part2.slt @@ -0,0 +1,22 @@ +include ../../nexmark/create_sources.slt.part + +include ../../streaming/nexmark/sinks/q4.slt.part +include ../../streaming/nexmark/sinks/q5.slt.part +include ../../streaming/nexmark/sinks/q7.slt.part +include ../../streaming/nexmark/sinks/q8.slt.part + +sleep 20s + +statement ok +drop sink nexmark_q4; + +statement ok +drop sink nexmark_q5; + +statement ok +drop sink nexmark_q7; + +statement ok +drop sink nexmark_q8; + +include ../../nexmark/drop_sources.slt.part diff --git a/e2e_test/source/nexmark_endless_sinks/nexmark_endless_part3.slt b/e2e_test/source/nexmark_endless_sinks/nexmark_endless_part3.slt new file mode 100644 index 000000000000..461a5adf736e --- /dev/null +++ b/e2e_test/source/nexmark_endless_sinks/nexmark_endless_part3.slt @@ -0,0 +1,22 @@ +include ../../nexmark/create_sources.slt.part + +include ../../streaming/nexmark/sinks/q9.slt.part +include ../../streaming/nexmark/sinks/q10.slt.part +include ../../streaming/nexmark/sinks/q14.slt.part +include ../../streaming/nexmark/sinks/q15.slt.part + +sleep 20s + +statement ok +drop sink nexmark_q9; + +statement ok +drop sink nexmark_q10; + +statement ok +drop sink nexmark_q14; + +statement ok +drop sink nexmark_q15; + +include ../../nexmark/drop_sources.slt.part diff --git a/e2e_test/source/nexmark_endless_sinks/nexmark_endless_part4.slt b/e2e_test/source/nexmark_endless_sinks/nexmark_endless_part4.slt new file mode 100644 index 000000000000..de68bcb397f2 --- /dev/null +++ b/e2e_test/source/nexmark_endless_sinks/nexmark_endless_part4.slt @@ -0,0 +1,22 @@ +include ../../nexmark/create_sources.slt.part + +include ../../streaming/nexmark/sinks/q16.slt.part +include ../../streaming/nexmark/sinks/q17.slt.part +include ../../streaming/nexmark/sinks/q18.slt.part +include ../../streaming/nexmark/sinks/q20.slt.part + +sleep 20s + +statement ok +drop sink nexmark_q16; + +statement ok +drop sink nexmark_q17; + +statement ok +drop sink nexmark_q18; + +statement ok +drop sink nexmark_q20; + +include ../../nexmark/drop_sources.slt.part diff --git a/e2e_test/source/nexmark_endless_sinks/nexmark_endless_part5.slt b/e2e_test/source/nexmark_endless_sinks/nexmark_endless_part5.slt new file mode 100644 index 000000000000..5dedb487b8ce --- /dev/null +++ b/e2e_test/source/nexmark_endless_sinks/nexmark_endless_part5.slt @@ -0,0 +1,22 @@ +include ../../nexmark/create_sources.slt.part + +include ../../streaming/nexmark/sinks/q21.slt.part +include ../../streaming/nexmark/sinks/q22.slt.part +include ../../streaming/nexmark/sinks/q101.slt.part +include ../../streaming/nexmark/sinks/q102.slt.part + +sleep 20s + +statement ok +drop sink nexmark_q21; + +statement ok +drop sink nexmark_q22; + +statement ok +drop sink nexmark_q101; + +statement ok +drop sink nexmark_q102; + +include ../../nexmark/drop_sources.slt.part diff --git a/e2e_test/source/nexmark_endless_sinks/nexmark_endless_part6.slt b/e2e_test/source/nexmark_endless_sinks/nexmark_endless_part6.slt new file mode 100644 index 000000000000..b251dc4a0e8e --- /dev/null +++ b/e2e_test/source/nexmark_endless_sinks/nexmark_endless_part6.slt @@ -0,0 +1,22 @@ +include ../../nexmark/create_sources.slt.part + +include ../../streaming/nexmark/sinks/q103.slt.part +include ../../streaming/nexmark/sinks/q104.slt.part +include ../../streaming/nexmark/sinks/q105.slt.part +include ../../streaming/nexmark/sinks/q106.slt.part + +sleep 20s + +statement ok +drop sink nexmark_q103; + +statement ok +drop sink nexmark_q104; + +statement ok +drop sink nexmark_q105; + +statement ok +drop sink nexmark_q106; + +include ../../nexmark/drop_sources.slt.part diff --git a/e2e_test/streaming/nexmark/sinks/q0.slt.part b/e2e_test/streaming/nexmark/sinks/q0.slt.part new file mode 100644 index 000000000000..ac63da8a5984 --- /dev/null +++ b/e2e_test/streaming/nexmark/sinks/q0.slt.part @@ -0,0 +1,5 @@ +statement ok +CREATE SINK nexmark_q0 +AS +SELECT auction, bidder, price, date_time FROM bid +WITH ( connector = 'blackhole', type = 'append-only', force_append_only = 'true'); diff --git a/e2e_test/streaming/nexmark/sinks/q1.slt.part b/e2e_test/streaming/nexmark/sinks/q1.slt.part new file mode 100644 index 000000000000..8c08554e1eeb --- /dev/null +++ b/e2e_test/streaming/nexmark/sinks/q1.slt.part @@ -0,0 +1,10 @@ +statement ok +CREATE SINK nexmark_q1 +AS +SELECT + auction, + bidder, + 0.908 * price as price, + date_time +FROM bid +WITH ( connector = 'blackhole', type = 'append-only', force_append_only = 'true'); diff --git a/e2e_test/streaming/nexmark/sinks/q10.slt.part b/e2e_test/streaming/nexmark/sinks/q10.slt.part new file mode 100644 index 000000000000..dbb252da6404 --- /dev/null +++ b/e2e_test/streaming/nexmark/sinks/q10.slt.part @@ -0,0 +1,4 @@ +statement ok +CREATE SINK nexmark_q10 AS +SELECT auction, bidder, price, date_time, TO_CHAR(date_time, 'YYYY-MM-DD') as date, TO_CHAR(date_time, 'HH:MI') as time FROM bid +WITH ( connector = 'blackhole', type = 'append-only', force_append_only = 'true'); diff --git a/e2e_test/streaming/nexmark/sinks/q101.slt.part b/e2e_test/streaming/nexmark/sinks/q101.slt.part new file mode 100644 index 000000000000..0287ddb14efe --- /dev/null +++ b/e2e_test/streaming/nexmark/sinks/q101.slt.part @@ -0,0 +1,16 @@ +statement ok +CREATE SINK nexmark_q101 +AS +SELECT + a.id AS auction_id, + a.item_name AS auction_item_name, + b.max_price AS current_highest_bid +FROM auction a +LEFT OUTER JOIN ( + SELECT + b1.auction, + MAX(b1.price) max_price + FROM bid b1 + GROUP BY b1.auction +) b ON a.id = b.auction +WITH ( connector = 'blackhole', type = 'append-only', force_append_only = 'true'); \ No newline at end of file diff --git a/e2e_test/streaming/nexmark/sinks/q102.slt.part b/e2e_test/streaming/nexmark/sinks/q102.slt.part new file mode 100644 index 000000000000..b1f84edf1060 --- /dev/null +++ b/e2e_test/streaming/nexmark/sinks/q102.slt.part @@ -0,0 +1,14 @@ +statement ok +CREATE SINK nexmark_q102 +AS +SELECT + a.id AS auction_id, + a.item_name AS auction_item_name, + COUNT(b.auction) AS bid_count +FROM auction a +JOIN bid b ON a.id = b.auction +GROUP BY a.id, a.item_name +HAVING COUNT(b.auction) >= ( + SELECT COUNT(*) / COUNT(DISTINCT auction) FROM bid +) +WITH ( connector = 'blackhole', type = 'append-only', force_append_only = 'true'); \ No newline at end of file diff --git a/e2e_test/streaming/nexmark/sinks/q103.slt.part b/e2e_test/streaming/nexmark/sinks/q103.slt.part new file mode 100644 index 000000000000..b0982485ecdc --- /dev/null +++ b/e2e_test/streaming/nexmark/sinks/q103.slt.part @@ -0,0 +1,13 @@ +statement ok +CREATE SINK nexmark_q103 +AS +SELECT + a.id AS auction_id, + a.item_name AS auction_item_name +FROM auction a +WHERE a.id IN ( + SELECT b.auction FROM bid b + GROUP BY b.auction + HAVING COUNT(*) >= 20 +) +WITH ( connector = 'blackhole', type = 'append-only', force_append_only = 'true'); \ No newline at end of file diff --git a/e2e_test/streaming/nexmark/sinks/q104.slt.part b/e2e_test/streaming/nexmark/sinks/q104.slt.part new file mode 100644 index 000000000000..e1b964434fe2 --- /dev/null +++ b/e2e_test/streaming/nexmark/sinks/q104.slt.part @@ -0,0 +1,13 @@ +statement ok +CREATE SINK nexmark_q104 +AS +SELECT + a.id AS auction_id, + a.item_name AS auction_item_name +FROM auction a +WHERE a.id NOT IN ( + SELECT b.auction FROM bid b + GROUP BY b.auction + HAVING COUNT(*) < 20 +) +WITH ( connector = 'blackhole', type = 'append-only', force_append_only = 'true'); \ No newline at end of file diff --git a/e2e_test/streaming/nexmark/sinks/q105.slt.part b/e2e_test/streaming/nexmark/sinks/q105.slt.part new file mode 100644 index 000000000000..3434a89963c1 --- /dev/null +++ b/e2e_test/streaming/nexmark/sinks/q105.slt.part @@ -0,0 +1,13 @@ +statement ok +CREATE SINK nexmark_q105 +AS +SELECT + a.id AS auction_id, + a.item_name AS auction_item_name, + COUNT(b.auction) AS bid_count +FROM auction a +JOIN bid b ON a.id = b.auction +GROUP BY a.id, a.item_name +ORDER BY bid_count DESC +LIMIT 1000 +WITH ( connector = 'blackhole', type = 'append-only', force_append_only = 'true'); \ No newline at end of file diff --git a/e2e_test/streaming/nexmark/sinks/q106.slt.part b/e2e_test/streaming/nexmark/sinks/q106.slt.part new file mode 100644 index 000000000000..093f592fe28c --- /dev/null +++ b/e2e_test/streaming/nexmark/sinks/q106.slt.part @@ -0,0 +1,20 @@ +statement ok +CREATE SINK nexmark_q106 +AS +SELECT + MIN(final) AS min_final +FROM + ( + SELECT + auction.id, + MAX(price) AS final + FROM + auction, + bid + WHERE + bid.auction = auction.id + AND bid.date_time BETWEEN auction.date_time AND auction.expires + GROUP BY + auction.id + ) +WITH ( connector = 'blackhole', type = 'append-only', force_append_only = 'true'); \ No newline at end of file diff --git a/e2e_test/streaming/nexmark/sinks/q14.slt.part b/e2e_test/streaming/nexmark/sinks/q14.slt.part new file mode 100644 index 000000000000..77047c7f3c81 --- /dev/null +++ b/e2e_test/streaming/nexmark/sinks/q14.slt.part @@ -0,0 +1,25 @@ +statement ok +CREATE SINK nexmark_q14 AS +SELECT + auction, + bidder, + 0.908 * price as price, + CASE + WHEN + extract(hour from date_time) >= 8 AND + extract(hour from date_time) <= 18 + THEN 'dayTime' + WHEN + extract(hour from date_time) <= 6 OR + extract(hour from date_time) >= 20 + THEN 'nightTime' + ELSE 'otherTime' + END AS bidTimeType, + date_time + -- extra + -- TODO: count_char is an UDF, add it back when we support similar functionality. + -- https://github.com/nexmark/nexmark/blob/master/nexmark-flink/src/main/java/com/github/nexmark/flink/udf/CountChar.java + -- count_char(extra, 'c') AS c_counts +FROM bid +WHERE 0.908 * price > 1000000 AND 0.908 * price < 50000000 +WITH ( connector = 'blackhole', type = 'append-only', force_append_only = 'true'); diff --git a/e2e_test/streaming/nexmark/sinks/q15.slt.part b/e2e_test/streaming/nexmark/sinks/q15.slt.part new file mode 100644 index 000000000000..77c31c6f2990 --- /dev/null +++ b/e2e_test/streaming/nexmark/sinks/q15.slt.part @@ -0,0 +1,19 @@ +statement ok +CREATE SINK nexmark_q15 AS +SELECT + to_char(date_time, 'YYYY-MM-DD') as "day", + count(*) AS total_bids, + count(*) filter (where price < 10000) AS rank1_bids, + count(*) filter (where price >= 10000 and price < 1000000) AS rank2_bids, + count(*) filter (where price >= 1000000) AS rank3_bids, + count(distinct bidder) AS total_bidders, + count(distinct bidder) filter (where price < 10000) AS rank1_bidders, + count(distinct bidder) filter (where price >= 10000 and price < 1000000) AS rank2_bidders, + count(distinct bidder) filter (where price >= 1000000) AS rank3_bidders, + count(distinct auction) AS total_auctions, + count(distinct auction) filter (where price < 10000) AS rank1_auctions, + count(distinct auction) filter (where price >= 10000 and price < 1000000) AS rank2_auctions, + count(distinct auction) filter (where price >= 1000000) AS rank3_auctions +FROM bid +GROUP BY to_char(date_time, 'YYYY-MM-DD') +WITH ( connector = 'blackhole', type = 'append-only', force_append_only = 'true'); diff --git a/e2e_test/streaming/nexmark/sinks/q16.slt.part b/e2e_test/streaming/nexmark/sinks/q16.slt.part new file mode 100644 index 000000000000..88b6f29362fc --- /dev/null +++ b/e2e_test/streaming/nexmark/sinks/q16.slt.part @@ -0,0 +1,21 @@ +statement ok +CREATE SINK nexmark_q16 AS +SELECT + channel, + to_char(date_time, 'YYYY-MM-DD') as "day", + max(to_char(date_time, 'HH:mm')) as "minute", + count(*) AS total_bids, + count(*) filter (where price < 10000) AS rank1_bids, + count(*) filter (where price >= 10000 and price < 1000000) AS rank2_bids, + count(*) filter (where price >= 1000000) AS rank3_bids, + count(distinct bidder) AS total_bidders, + count(distinct bidder) filter (where price < 10000) AS rank1_bidders, + count(distinct bidder) filter (where price >= 10000 and price < 1000000) AS rank2_bidders, + count(distinct bidder) filter (where price >= 1000000) AS rank3_bidders, + count(distinct auction) AS total_auctions, + count(distinct auction) filter (where price < 10000) AS rank1_auctions, + count(distinct auction) filter (where price >= 10000 and price < 1000000) AS rank2_auctions, + count(distinct auction) filter (where price >= 1000000) AS rank3_auctions +FROM bid +GROUP BY channel, to_char(date_time, 'YYYY-MM-DD') +WITH ( connector = 'blackhole', type = 'append-only', force_append_only = 'true'); diff --git a/e2e_test/streaming/nexmark/sinks/q17.slt.part b/e2e_test/streaming/nexmark/sinks/q17.slt.part new file mode 100644 index 000000000000..b32bb80526f1 --- /dev/null +++ b/e2e_test/streaming/nexmark/sinks/q17.slt.part @@ -0,0 +1,16 @@ +statement ok +CREATE SINK nexmark_q17 AS +SELECT + auction, + to_char(date_time, 'YYYY-MM-DD') AS day, + count(*) AS total_bids, + count(*) filter (where price < 10000) AS rank1_bids, + count(*) filter (where price >= 10000 and price < 1000000) AS rank2_bids, + count(*) filter (where price >= 1000000) AS rank3_bids, + min(price) AS min_price, + max(price) AS max_price, + avg(price) AS avg_price, + sum(price) AS sum_price +FROM bid +GROUP BY auction, to_char(date_time, 'YYYY-MM-DD') +WITH ( connector = 'blackhole', type = 'append-only', force_append_only = 'true'); diff --git a/e2e_test/streaming/nexmark/sinks/q18.slt.part b/e2e_test/streaming/nexmark/sinks/q18.slt.part new file mode 100644 index 000000000000..46d2ad3b1c83 --- /dev/null +++ b/e2e_test/streaming/nexmark/sinks/q18.slt.part @@ -0,0 +1,14 @@ +statement ok +CREATE SINK nexmark_q18 AS +SELECT auction, bidder, price, channel, url, date_time, extra +FROM ( + SELECT *, + ROW_NUMBER() OVER ( + PARTITION BY bidder, auction + ORDER BY date_time DESC, extra + -- extra is addtionally added here to make the result deterministic + ) AS rank_number + FROM bid +) +WHERE rank_number <= 1 +WITH ( connector = 'blackhole', type = 'append-only', force_append_only = 'true'); diff --git a/e2e_test/streaming/nexmark/sinks/q2.slt.part b/e2e_test/streaming/nexmark/sinks/q2.slt.part new file mode 100644 index 000000000000..55a1e697fb22 --- /dev/null +++ b/e2e_test/streaming/nexmark/sinks/q2.slt.part @@ -0,0 +1,6 @@ +statement ok +CREATE SINK nexmark_q2 +AS +SELECT auction, price FROM bid +WHERE auction = 1007 OR auction = 1020 OR auction = 2001 OR auction = 2019 OR auction = 2087 +WITH ( connector = 'blackhole', type = 'append-only', force_append_only = 'true'); diff --git a/e2e_test/streaming/nexmark/sinks/q20.slt.part b/e2e_test/streaming/nexmark/sinks/q20.slt.part new file mode 100644 index 000000000000..364a25e787ea --- /dev/null +++ b/e2e_test/streaming/nexmark/sinks/q20.slt.part @@ -0,0 +1,9 @@ +statement ok +CREATE SINK nexmark_q20 AS +SELECT + auction, bidder, price, channel, url, B.date_time as bid_date_time, B.extra as bid_extra, + item_name, description, initial_bid, reserve, A.date_time as auction_date_time, expires, seller, category, A.extra as auction_extra +FROM + bid AS B INNER JOIN auction AS A on B.auction = A.id +WHERE A.category = 10 +WITH ( connector = 'blackhole', type = 'append-only', force_append_only = 'true'); diff --git a/e2e_test/streaming/nexmark/sinks/q21.slt.part b/e2e_test/streaming/nexmark/sinks/q21.slt.part new file mode 100644 index 000000000000..b1bfc2aadada --- /dev/null +++ b/e2e_test/streaming/nexmark/sinks/q21.slt.part @@ -0,0 +1,17 @@ +statement ok +CREATE SINK nexmark_q21 AS +SELECT + auction, bidder, price, channel, + CASE + WHEN LOWER(channel) = 'apple' THEN '0' + WHEN LOWER(channel) = 'google' THEN '1' + WHEN LOWER(channel) = 'facebook' THEN '2' + WHEN LOWER(channel) = 'baidu' THEN '3' + ELSE (regexp_match(url, '(&|^)channel_id=([^&]*)'))[2] + END + AS channel_id +FROM + bid +WHERE + (regexp_match(url, '(&|^)channel_id=([^&]*)'))[2] is not null or LOWER(channel) in ('apple', 'google', 'facebook', 'baidu') +WITH ( connector = 'blackhole', type = 'append-only', force_append_only = 'true'); diff --git a/e2e_test/streaming/nexmark/sinks/q22.slt.part b/e2e_test/streaming/nexmark/sinks/q22.slt.part new file mode 100644 index 000000000000..b6086b6a9ee0 --- /dev/null +++ b/e2e_test/streaming/nexmark/sinks/q22.slt.part @@ -0,0 +1,13 @@ +# The third parameter of split_index function in Flink SQL is a cardinal number, starting from 0 +# The third parameter of split_part function in PostgreSQL is an ordinal number, starting from 1 + +statement ok +CREATE SINK nexmark_q22 AS +SELECT + auction, bidder, price, channel, + split_part(url, '/', 4) as dir1, + split_part(url, '/', 5) as dir2, + split_part(url, '/', 6) as dir3 +FROM + bid +WITH ( connector = 'blackhole', type = 'append-only', force_append_only = 'true'); diff --git a/e2e_test/streaming/nexmark/sinks/q3.slt.part b/e2e_test/streaming/nexmark/sinks/q3.slt.part new file mode 100644 index 000000000000..eabea1fea180 --- /dev/null +++ b/e2e_test/streaming/nexmark/sinks/q3.slt.part @@ -0,0 +1,10 @@ +statement ok +CREATE SINK nexmark_q3 +AS +SELECT + P.name, P.city, P.state, A.id +FROM + auction AS A INNER JOIN person AS P on A.seller = P.id +WHERE + A.category = 10 and (P.state = 'or' OR P.state = 'id' OR P.state = 'ca') +WITH ( connector = 'blackhole', type = 'append-only', force_append_only = 'true'); diff --git a/e2e_test/streaming/nexmark/sinks/q4.slt.part b/e2e_test/streaming/nexmark/sinks/q4.slt.part new file mode 100644 index 000000000000..86b443554cd3 --- /dev/null +++ b/e2e_test/streaming/nexmark/sinks/q4.slt.part @@ -0,0 +1,21 @@ +statement ok +CREATE SINK nexmark_q4 +AS +SELECT + Q.category, + AVG(Q.final) as avg +FROM ( + SELECT + MAX(B.price) AS final, A.category + FROM + auction A, + bid B + WHERE + A.id = B.auction AND + B.date_time BETWEEN A.date_time AND A.expires + GROUP BY + A.id, A.category + ) Q +GROUP BY + Q.category +WITH ( connector = 'blackhole', type = 'append-only', force_append_only = 'true'); diff --git a/e2e_test/streaming/nexmark/sinks/q5.slt.part b/e2e_test/streaming/nexmark/sinks/q5.slt.part new file mode 100644 index 000000000000..271f231ab7c0 --- /dev/null +++ b/e2e_test/streaming/nexmark/sinks/q5.slt.part @@ -0,0 +1,36 @@ +statement ok +CREATE SINK nexmark_q5 +AS +SELECT + AuctionBids.auction, AuctionBids.num +FROM ( + SELECT + bid.auction, + count(*) AS num, + window_start AS starttime + FROM + HOP(bid, date_time, INTERVAL '2' SECOND, INTERVAL '10' SECOND) + GROUP BY + bid.auction, + window_start +) AS AuctionBids +JOIN ( + SELECT + max(CountBids.num) AS maxn, + CountBids.starttime_c + FROM ( + SELECT + count(*) AS num, + window_start AS starttime_c + FROM HOP(bid, date_time, INTERVAL '2' SECOND, INTERVAL '10' SECOND) + GROUP BY + bid.auction, + window_start + ) AS CountBids + GROUP BY + CountBids.starttime_c + ) AS MaxBids +ON + AuctionBids.starttime = MaxBids.starttime_c AND + AuctionBids.num >= MaxBids.maxn +WITH ( connector = 'blackhole', type = 'append-only', force_append_only = 'true'); diff --git a/e2e_test/streaming/nexmark/sinks/q7.slt.part b/e2e_test/streaming/nexmark/sinks/q7.slt.part new file mode 100644 index 000000000000..fea150b3a8bf --- /dev/null +++ b/e2e_test/streaming/nexmark/sinks/q7.slt.part @@ -0,0 +1,23 @@ +statement ok +CREATE SINK nexmark_q7 +AS +SELECT + B.auction, + B.price, + B.bidder, + B.date_time +from + bid B + JOIN ( + SELECT + MAX(price) AS maxprice, + window_end as date_time + FROM + TUMBLE(bid, date_time, INTERVAL '10' SECOND) + GROUP BY + window_end + ) B1 ON B.price = B1.maxprice +WHERE + B.date_time BETWEEN B1.date_time - INTERVAL '10' SECOND + AND B1.date_time +WITH ( connector = 'blackhole', type = 'append-only', force_append_only = 'true'); diff --git a/e2e_test/streaming/nexmark/sinks/q8.slt.part b/e2e_test/streaming/nexmark/sinks/q8.slt.part new file mode 100644 index 000000000000..8826dbb3d0bc --- /dev/null +++ b/e2e_test/streaming/nexmark/sinks/q8.slt.part @@ -0,0 +1,37 @@ +statement ok +CREATE SINK nexmark_q8 +AS +SELECT + P.id, + P.name, + P.starttime +FROM + ( + SELECT + id, + name, + window_start AS starttime, + window_end AS endtime + FROM + TUMBLE(person, date_time, INTERVAL '10' SECOND) + GROUP BY + id, + name, + window_start, + window_end + ) P + JOIN ( + SELECT + seller, + window_start AS starttime, + window_end AS endtime + FROM + TUMBLE(auction, date_time, INTERVAL '10' SECOND) + GROUP BY + seller, + window_start, + window_end + ) A ON P.id = A.seller + AND P.starttime = A.starttime + AND P.endtime = A.endtime +WITH ( connector = 'blackhole', type = 'append-only', force_append_only = 'true'); diff --git a/e2e_test/streaming/nexmark/sinks/q9.slt.part b/e2e_test/streaming/nexmark/sinks/q9.slt.part new file mode 100644 index 000000000000..86f760769de8 --- /dev/null +++ b/e2e_test/streaming/nexmark/sinks/q9.slt.part @@ -0,0 +1,14 @@ +statement ok +CREATE SINK nexmark_q9 +AS +SELECT + id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, + auction, bidder, price, bid_date_time +FROM ( + SELECT A.*, B.auction, B.bidder, B.price, B.date_time AS bid_date_time, + ROW_NUMBER() OVER (PARTITION BY A.id ORDER BY B.price DESC, B.date_time ASC) AS rownum + FROM auction A, bid B + WHERE A.id = B.auction AND B.date_time BETWEEN A.date_time AND A.expires +) tmp +WHERE rownum <= 1 +WITH ( connector = 'blackhole', type = 'append-only', force_append_only = 'true');