Skip to content

Commit

Permalink
test: add create and drop sink test for nexmark (#8686)
Browse files Browse the repository at this point in the history
  • Loading branch information
lmatz authored Apr 11, 2023
1 parent a426bef commit 07cd8b7
Show file tree
Hide file tree
Showing 38 changed files with 535 additions and 9 deletions.
9 changes: 9 additions & 0 deletions ci/scripts/e2e-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,12 @@ fi
echo "--- Kill cluster"
cargo make ci-kill
pkill -f connector-node

echo "--- e2e, ci-1cn-1fe, nexmark endless"
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
cargo make ci-start ci-1cn-1fe
sqllogictest -p 4566 -d dev './e2e_test/source/nexmark_endless_mvs/*.slt'
sqllogictest -p 4566 -d dev './e2e_test/source/nexmark_endless_sinks/*.slt'

echo "--- Kill cluster"
cargo make ci-kill
10 changes: 3 additions & 7 deletions ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ node_port=50051
node_timeout=10

echo "--- starting risingwave cluster with connector node"
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
cargo make ci-start ci-1cn-1fe-with-recovery
./connector-node/start-service.sh -p $node_port > .risingwave/log/connector-node.log 2>&1 &

Expand Down Expand Up @@ -93,6 +94,7 @@ mysql --host=mysql --port=3306 -u root -p123456 < ./e2e_test/source/cdc/mysql_cd
psql -h db -U postgres -d cdc_test < ./e2e_test/source/cdc/postgres_cdc_insert.sql

# start cluster w/o clean-data
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
cargo make dev ci-1cn-1fe-with-recovery
echo "wait for recovery finish"
sleep 20
Expand All @@ -104,14 +106,8 @@ echo "--- Kill cluster"
cargo make ci-kill
pkill -f connector-node

echo "--- e2e, ci-1cn-1fe, nexmark endless"
cargo make ci-start ci-1cn-1fe
sqllogictest -p 4566 -d dev './e2e_test/source/nexmark_endless/*.slt'

echo "--- Kill cluster"
cargo make ci-kill

echo "--- e2e, ci-kafka-plus-pubsub, kafka and pubsub source"
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
cargo make ci-start ci-kafka-plus-pubsub
./scripts/source/prepare_ci_kafka.sh
cargo run --bin prepare_ci_pubsub
Expand Down
2 changes: 1 addition & 1 deletion ci/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ steps:
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 5
timeout_in_minutes: 18

- label: "end-to-end iceberg sink test (release mode)"
command: "ci/scripts/e2e-iceberg-sink-test.sh -p ci-release"
Expand Down
2 changes: 1 addition & 1 deletion ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ steps:
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 5
timeout_in_minutes: 18

- label: "end-to-end iceberg sink test"
command: "ci/scripts/e2e-iceberg-sink-test.sh -p ci-dev"
Expand Down
22 changes: 22 additions & 0 deletions e2e_test/source/nexmark_endless_sinks/nexmark_endless_part1.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
include ../../nexmark/create_sources.slt.part

include ../../streaming/nexmark/sinks/q0.slt.part
include ../../streaming/nexmark/sinks/q1.slt.part
include ../../streaming/nexmark/sinks/q2.slt.part
include ../../streaming/nexmark/sinks/q3.slt.part

sleep 20s

statement ok
drop sink nexmark_q0;

statement ok
drop sink nexmark_q1;

statement ok
drop sink nexmark_q2;

statement ok
drop sink nexmark_q3;

include ../../nexmark/drop_sources.slt.part
22 changes: 22 additions & 0 deletions e2e_test/source/nexmark_endless_sinks/nexmark_endless_part2.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
include ../../nexmark/create_sources.slt.part

include ../../streaming/nexmark/sinks/q4.slt.part
include ../../streaming/nexmark/sinks/q5.slt.part
include ../../streaming/nexmark/sinks/q7.slt.part
include ../../streaming/nexmark/sinks/q8.slt.part

sleep 20s

statement ok
drop sink nexmark_q4;

statement ok
drop sink nexmark_q5;

statement ok
drop sink nexmark_q7;

statement ok
drop sink nexmark_q8;

include ../../nexmark/drop_sources.slt.part
22 changes: 22 additions & 0 deletions e2e_test/source/nexmark_endless_sinks/nexmark_endless_part3.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
include ../../nexmark/create_sources.slt.part

include ../../streaming/nexmark/sinks/q9.slt.part
include ../../streaming/nexmark/sinks/q10.slt.part
include ../../streaming/nexmark/sinks/q14.slt.part
include ../../streaming/nexmark/sinks/q15.slt.part

sleep 20s

statement ok
drop sink nexmark_q9;

statement ok
drop sink nexmark_q10;

statement ok
drop sink nexmark_q14;

statement ok
drop sink nexmark_q15;

include ../../nexmark/drop_sources.slt.part
22 changes: 22 additions & 0 deletions e2e_test/source/nexmark_endless_sinks/nexmark_endless_part4.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
include ../../nexmark/create_sources.slt.part

include ../../streaming/nexmark/sinks/q16.slt.part
include ../../streaming/nexmark/sinks/q17.slt.part
include ../../streaming/nexmark/sinks/q18.slt.part
include ../../streaming/nexmark/sinks/q20.slt.part

sleep 20s

statement ok
drop sink nexmark_q16;

statement ok
drop sink nexmark_q17;

statement ok
drop sink nexmark_q18;

statement ok
drop sink nexmark_q20;

include ../../nexmark/drop_sources.slt.part
22 changes: 22 additions & 0 deletions e2e_test/source/nexmark_endless_sinks/nexmark_endless_part5.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
include ../../nexmark/create_sources.slt.part

include ../../streaming/nexmark/sinks/q21.slt.part
include ../../streaming/nexmark/sinks/q22.slt.part
include ../../streaming/nexmark/sinks/q101.slt.part
include ../../streaming/nexmark/sinks/q102.slt.part

sleep 20s

statement ok
drop sink nexmark_q21;

statement ok
drop sink nexmark_q22;

statement ok
drop sink nexmark_q101;

statement ok
drop sink nexmark_q102;

include ../../nexmark/drop_sources.slt.part
22 changes: 22 additions & 0 deletions e2e_test/source/nexmark_endless_sinks/nexmark_endless_part6.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
include ../../nexmark/create_sources.slt.part

include ../../streaming/nexmark/sinks/q103.slt.part
include ../../streaming/nexmark/sinks/q104.slt.part
include ../../streaming/nexmark/sinks/q105.slt.part
include ../../streaming/nexmark/sinks/q106.slt.part

sleep 20s

statement ok
drop sink nexmark_q103;

statement ok
drop sink nexmark_q104;

statement ok
drop sink nexmark_q105;

statement ok
drop sink nexmark_q106;

include ../../nexmark/drop_sources.slt.part
5 changes: 5 additions & 0 deletions e2e_test/streaming/nexmark/sinks/q0.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
statement ok
CREATE SINK nexmark_q0
AS
SELECT auction, bidder, price, date_time FROM bid
WITH ( connector = 'blackhole', type = 'append-only', force_append_only = 'true');
10 changes: 10 additions & 0 deletions e2e_test/streaming/nexmark/sinks/q1.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
statement ok
CREATE SINK nexmark_q1
AS
SELECT
auction,
bidder,
0.908 * price as price,
date_time
FROM bid
WITH ( connector = 'blackhole', type = 'append-only', force_append_only = 'true');
4 changes: 4 additions & 0 deletions e2e_test/streaming/nexmark/sinks/q10.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
statement ok
CREATE SINK nexmark_q10 AS
SELECT auction, bidder, price, date_time, TO_CHAR(date_time, 'YYYY-MM-DD') as date, TO_CHAR(date_time, 'HH:MI') as time FROM bid
WITH ( connector = 'blackhole', type = 'append-only', force_append_only = 'true');
16 changes: 16 additions & 0 deletions e2e_test/streaming/nexmark/sinks/q101.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
statement ok
CREATE SINK nexmark_q101
AS
SELECT
a.id AS auction_id,
a.item_name AS auction_item_name,
b.max_price AS current_highest_bid
FROM auction a
LEFT OUTER JOIN (
SELECT
b1.auction,
MAX(b1.price) max_price
FROM bid b1
GROUP BY b1.auction
) b ON a.id = b.auction
WITH ( connector = 'blackhole', type = 'append-only', force_append_only = 'true');
14 changes: 14 additions & 0 deletions e2e_test/streaming/nexmark/sinks/q102.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
statement ok
CREATE SINK nexmark_q102
AS
SELECT
a.id AS auction_id,
a.item_name AS auction_item_name,
COUNT(b.auction) AS bid_count
FROM auction a
JOIN bid b ON a.id = b.auction
GROUP BY a.id, a.item_name
HAVING COUNT(b.auction) >= (
SELECT COUNT(*) / COUNT(DISTINCT auction) FROM bid
)
WITH ( connector = 'blackhole', type = 'append-only', force_append_only = 'true');
13 changes: 13 additions & 0 deletions e2e_test/streaming/nexmark/sinks/q103.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
statement ok
CREATE SINK nexmark_q103
AS
SELECT
a.id AS auction_id,
a.item_name AS auction_item_name
FROM auction a
WHERE a.id IN (
SELECT b.auction FROM bid b
GROUP BY b.auction
HAVING COUNT(*) >= 20
)
WITH ( connector = 'blackhole', type = 'append-only', force_append_only = 'true');
13 changes: 13 additions & 0 deletions e2e_test/streaming/nexmark/sinks/q104.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
statement ok
CREATE SINK nexmark_q104
AS
SELECT
a.id AS auction_id,
a.item_name AS auction_item_name
FROM auction a
WHERE a.id NOT IN (
SELECT b.auction FROM bid b
GROUP BY b.auction
HAVING COUNT(*) < 20
)
WITH ( connector = 'blackhole', type = 'append-only', force_append_only = 'true');
13 changes: 13 additions & 0 deletions e2e_test/streaming/nexmark/sinks/q105.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
statement ok
CREATE SINK nexmark_q105
AS
SELECT
a.id AS auction_id,
a.item_name AS auction_item_name,
COUNT(b.auction) AS bid_count
FROM auction a
JOIN bid b ON a.id = b.auction
GROUP BY a.id, a.item_name
ORDER BY bid_count DESC
LIMIT 1000
WITH ( connector = 'blackhole', type = 'append-only', force_append_only = 'true');
20 changes: 20 additions & 0 deletions e2e_test/streaming/nexmark/sinks/q106.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
statement ok
CREATE SINK nexmark_q106
AS
SELECT
MIN(final) AS min_final
FROM
(
SELECT
auction.id,
MAX(price) AS final
FROM
auction,
bid
WHERE
bid.auction = auction.id
AND bid.date_time BETWEEN auction.date_time AND auction.expires
GROUP BY
auction.id
)
WITH ( connector = 'blackhole', type = 'append-only', force_append_only = 'true');
25 changes: 25 additions & 0 deletions e2e_test/streaming/nexmark/sinks/q14.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
statement ok
CREATE SINK nexmark_q14 AS
SELECT
auction,
bidder,
0.908 * price as price,
CASE
WHEN
extract(hour from date_time) >= 8 AND
extract(hour from date_time) <= 18
THEN 'dayTime'
WHEN
extract(hour from date_time) <= 6 OR
extract(hour from date_time) >= 20
THEN 'nightTime'
ELSE 'otherTime'
END AS bidTimeType,
date_time
-- extra
-- TODO: count_char is an UDF, add it back when we support similar functionality.
-- https://github.com/nexmark/nexmark/blob/master/nexmark-flink/src/main/java/com/github/nexmark/flink/udf/CountChar.java
-- count_char(extra, 'c') AS c_counts
FROM bid
WHERE 0.908 * price > 1000000 AND 0.908 * price < 50000000
WITH ( connector = 'blackhole', type = 'append-only', force_append_only = 'true');
19 changes: 19 additions & 0 deletions e2e_test/streaming/nexmark/sinks/q15.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
statement ok
CREATE SINK nexmark_q15 AS
SELECT
to_char(date_time, 'YYYY-MM-DD') as "day",
count(*) AS total_bids,
count(*) filter (where price < 10000) AS rank1_bids,
count(*) filter (where price >= 10000 and price < 1000000) AS rank2_bids,
count(*) filter (where price >= 1000000) AS rank3_bids,
count(distinct bidder) AS total_bidders,
count(distinct bidder) filter (where price < 10000) AS rank1_bidders,
count(distinct bidder) filter (where price >= 10000 and price < 1000000) AS rank2_bidders,
count(distinct bidder) filter (where price >= 1000000) AS rank3_bidders,
count(distinct auction) AS total_auctions,
count(distinct auction) filter (where price < 10000) AS rank1_auctions,
count(distinct auction) filter (where price >= 10000 and price < 1000000) AS rank2_auctions,
count(distinct auction) filter (where price >= 1000000) AS rank3_auctions
FROM bid
GROUP BY to_char(date_time, 'YYYY-MM-DD')
WITH ( connector = 'blackhole', type = 'append-only', force_append_only = 'true');
21 changes: 21 additions & 0 deletions e2e_test/streaming/nexmark/sinks/q16.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
statement ok
CREATE SINK nexmark_q16 AS
SELECT
channel,
to_char(date_time, 'YYYY-MM-DD') as "day",
max(to_char(date_time, 'HH:mm')) as "minute",
count(*) AS total_bids,
count(*) filter (where price < 10000) AS rank1_bids,
count(*) filter (where price >= 10000 and price < 1000000) AS rank2_bids,
count(*) filter (where price >= 1000000) AS rank3_bids,
count(distinct bidder) AS total_bidders,
count(distinct bidder) filter (where price < 10000) AS rank1_bidders,
count(distinct bidder) filter (where price >= 10000 and price < 1000000) AS rank2_bidders,
count(distinct bidder) filter (where price >= 1000000) AS rank3_bidders,
count(distinct auction) AS total_auctions,
count(distinct auction) filter (where price < 10000) AS rank1_auctions,
count(distinct auction) filter (where price >= 10000 and price < 1000000) AS rank2_auctions,
count(distinct auction) filter (where price >= 1000000) AS rank3_auctions
FROM bid
GROUP BY channel, to_char(date_time, 'YYYY-MM-DD')
WITH ( connector = 'blackhole', type = 'append-only', force_append_only = 'true');
Loading

0 comments on commit 07cd8b7

Please sign in to comment.