Skip to content

Commit

Permalink
[SPARK-44448][SQL] Fix wrong results bug from DenseRankLimitIterator …
Browse files Browse the repository at this point in the history
…and InferWindowGroupLimit

### What changes were proposed in this pull request?
Top-k filters on a dense_rank() window function return wrong results, due to a bug in optimization InferWindowGroupLimit, specifically in the code for DenseRankLimitIterator, introduced in https://issues.apache.org/jira/browse/SPARK-37099.

The bug is in DenseRankLimitIterator, it fails to reset state properly when transitioning from one window partition to the next. reset only resets rank = 0, what it is missing is to reset currentRankRow = null. This means that when processing the second and later window partitions, the rank incorrectly gets incremented based on comparing the ordering of the last row of the previous partition to the first row of the new partition.

This means that a dense_rank window func that has more than one window partition and more than one row with dense_rank = 1 in the second or later partitions can give wrong results when optimized.

RankLimitIterator narrowly avoids this bug by happenstance, the first row in the new partition will try to increment rank, but increment it by the value of count which is 0, so it happens to work by accident. This PR also fixes the reset function in RankLimitIterator to make it more robust.

Example repro:
```
create or replace temp view t1 (p, o) as values (1, 1), (1, 1), (1, 2), (2, 1), (2, 1), (2, 2);

select * from (select *, dense_rank() over (partition by p order by o) as rnk from t1) where rnk = 1;
```

Spark result:
```
[1,1,1]
[1,1,1]
[2,1,1]
```

Correct result:
```
[1,1,1]
[1,1,1]
[2,1,1]
[2,1,1]
```

### Why are the changes needed?
Fix wrong results bug.

### Does this PR introduce _any_ user-facing change?
Yes, fixes wrong results.

### How was this patch tested?
Add sql tests and unit tests.

Unfortunately, the previous tests for the optimization only had a single row per rank, so did not catch the bug as the bug requires multiple rows per rank. This PR strengthens the tests with data that contains multiple rows per rank.

Closes #42026 from jchen5/dense-rank-limit.

Authored-by: Jack Chen <jack.chen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
jchen5 authored and cloud-fan committed Jul 19, 2023
1 parent ede82a9 commit f35c814
Show file tree
Hide file tree
Showing 5 changed files with 276 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ case class RankLimitIterator(
override def reset(): Unit = {
rank = 0
count = 0
currentRankRow = null
}
}

Expand All @@ -193,6 +194,7 @@ case class DenseRankLimitIterator(

override def reset(): Unit = {
rank = 0
currentRankRow = null
}
}

Expand Down
131 changes: 131 additions & 0 deletions sql/core/src/test/resources/sql-tests/analyzer-results/window.sql.out
Original file line number Diff line number Diff line change
Expand Up @@ -1279,3 +1279,134 @@ org.apache.spark.sql.AnalysisException
"windowName" : "w"
}
}


-- !query
create or replace temp view t1 (p, o) as values (1, 1), (1, 1), (1, 2), (2, 1), (2, 1), (2, 2)
-- !query analysis
CreateViewCommand `t1`, [(p,None), (o,None)], values (1, 1), (1, 1), (1, 2), (2, 1), (2, 1), (2, 2), false, true, LocalTempView, true
+- LocalRelation [col1#x, col2#x]


-- !query
select * from (select *, dense_rank() over (partition by p order by o) as rnk from t1) where rnk = 1
-- !query analysis
Project [p#x, o#x, rnk#x]
+- Filter (rnk#x = 1)
+- SubqueryAlias __auto_generated_subquery_name
+- Project [p#x, o#x, rnk#x]
+- Project [p#x, o#x, rnk#x, rnk#x]
+- Window [dense_rank(o#x) windowspecdefinition(p#x, o#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rnk#x], [p#x], [o#x ASC NULLS FIRST]
+- Project [p#x, o#x]
+- SubqueryAlias t1
+- View (`t1`, [p#x,o#x])
+- Project [cast(col1#x as int) AS p#x, cast(col2#x as int) AS o#x]
+- LocalRelation [col1#x, col2#x]


-- !query
SELECT * FROM (SELECT cate, val, rank() OVER(PARTITION BY cate ORDER BY val) as r FROM testData) where r = 1
-- !query analysis
Project [cate#x, val#x, r#x]
+- Filter (r#x = 1)
+- SubqueryAlias __auto_generated_subquery_name
+- Project [cate#x, val#x, r#x]
+- Project [cate#x, val#x, r#x, r#x]
+- Window [rank(val#x) windowspecdefinition(cate#x, val#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS r#x], [cate#x], [val#x ASC NULLS FIRST]
+- Project [cate#x, val#x]
+- SubqueryAlias testdata
+- View (`testData`, [val#x,val_long#xL,val_double#x,val_date#x,val_timestamp#x,cate#x])
+- Project [cast(val#x as int) AS val#x, cast(val_long#xL as bigint) AS val_long#xL, cast(val_double#x as double) AS val_double#x, cast(val_date#x as date) AS val_date#x, cast(val_timestamp#x as timestamp) AS val_timestamp#x, cast(cate#x as string) AS cate#x]
+- Project [val#x, val_long#xL, val_double#x, val_date#x, val_timestamp#x, cate#x]
+- SubqueryAlias testData
+- LocalRelation [val#x, val_long#xL, val_double#x, val_date#x, val_timestamp#x, cate#x]


-- !query
SELECT * FROM (SELECT cate, val, rank() OVER(PARTITION BY cate ORDER BY val) as r FROM testData) where r <= 2
-- !query analysis
Project [cate#x, val#x, r#x]
+- Filter (r#x <= 2)
+- SubqueryAlias __auto_generated_subquery_name
+- Project [cate#x, val#x, r#x]
+- Project [cate#x, val#x, r#x, r#x]
+- Window [rank(val#x) windowspecdefinition(cate#x, val#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS r#x], [cate#x], [val#x ASC NULLS FIRST]
+- Project [cate#x, val#x]
+- SubqueryAlias testdata
+- View (`testData`, [val#x,val_long#xL,val_double#x,val_date#x,val_timestamp#x,cate#x])
+- Project [cast(val#x as int) AS val#x, cast(val_long#xL as bigint) AS val_long#xL, cast(val_double#x as double) AS val_double#x, cast(val_date#x as date) AS val_date#x, cast(val_timestamp#x as timestamp) AS val_timestamp#x, cast(cate#x as string) AS cate#x]
+- Project [val#x, val_long#xL, val_double#x, val_date#x, val_timestamp#x, cate#x]
+- SubqueryAlias testData
+- LocalRelation [val#x, val_long#xL, val_double#x, val_date#x, val_timestamp#x, cate#x]


-- !query
SELECT * FROM (SELECT cate, val, dense_rank() OVER(PARTITION BY cate ORDER BY val) as r FROM testData) where r = 1
-- !query analysis
Project [cate#x, val#x, r#x]
+- Filter (r#x = 1)
+- SubqueryAlias __auto_generated_subquery_name
+- Project [cate#x, val#x, r#x]
+- Project [cate#x, val#x, r#x, r#x]
+- Window [dense_rank(val#x) windowspecdefinition(cate#x, val#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS r#x], [cate#x], [val#x ASC NULLS FIRST]
+- Project [cate#x, val#x]
+- SubqueryAlias testdata
+- View (`testData`, [val#x,val_long#xL,val_double#x,val_date#x,val_timestamp#x,cate#x])
+- Project [cast(val#x as int) AS val#x, cast(val_long#xL as bigint) AS val_long#xL, cast(val_double#x as double) AS val_double#x, cast(val_date#x as date) AS val_date#x, cast(val_timestamp#x as timestamp) AS val_timestamp#x, cast(cate#x as string) AS cate#x]
+- Project [val#x, val_long#xL, val_double#x, val_date#x, val_timestamp#x, cate#x]
+- SubqueryAlias testData
+- LocalRelation [val#x, val_long#xL, val_double#x, val_date#x, val_timestamp#x, cate#x]


-- !query
SELECT * FROM (SELECT cate, val, dense_rank() OVER(PARTITION BY cate ORDER BY val) as r FROM testData) where r <= 2
-- !query analysis
Project [cate#x, val#x, r#x]
+- Filter (r#x <= 2)
+- SubqueryAlias __auto_generated_subquery_name
+- Project [cate#x, val#x, r#x]
+- Project [cate#x, val#x, r#x, r#x]
+- Window [dense_rank(val#x) windowspecdefinition(cate#x, val#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS r#x], [cate#x], [val#x ASC NULLS FIRST]
+- Project [cate#x, val#x]
+- SubqueryAlias testdata
+- View (`testData`, [val#x,val_long#xL,val_double#x,val_date#x,val_timestamp#x,cate#x])
+- Project [cast(val#x as int) AS val#x, cast(val_long#xL as bigint) AS val_long#xL, cast(val_double#x as double) AS val_double#x, cast(val_date#x as date) AS val_date#x, cast(val_timestamp#x as timestamp) AS val_timestamp#x, cast(cate#x as string) AS cate#x]
+- Project [val#x, val_long#xL, val_double#x, val_date#x, val_timestamp#x, cate#x]
+- SubqueryAlias testData
+- LocalRelation [val#x, val_long#xL, val_double#x, val_date#x, val_timestamp#x, cate#x]


-- !query
SELECT * FROM (SELECT cate, val, row_number() OVER(PARTITION BY cate ORDER BY val) as r FROM testData) where r = 1
-- !query analysis
Project [cate#x, val#x, r#x]
+- Filter (r#x = 1)
+- SubqueryAlias __auto_generated_subquery_name
+- Project [cate#x, val#x, r#x]
+- Project [cate#x, val#x, r#x, r#x]
+- Window [row_number() windowspecdefinition(cate#x, val#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS r#x], [cate#x], [val#x ASC NULLS FIRST]
+- Project [cate#x, val#x]
+- SubqueryAlias testdata
+- View (`testData`, [val#x,val_long#xL,val_double#x,val_date#x,val_timestamp#x,cate#x])
+- Project [cast(val#x as int) AS val#x, cast(val_long#xL as bigint) AS val_long#xL, cast(val_double#x as double) AS val_double#x, cast(val_date#x as date) AS val_date#x, cast(val_timestamp#x as timestamp) AS val_timestamp#x, cast(cate#x as string) AS cate#x]
+- Project [val#x, val_long#xL, val_double#x, val_date#x, val_timestamp#x, cate#x]
+- SubqueryAlias testData
+- LocalRelation [val#x, val_long#xL, val_double#x, val_date#x, val_timestamp#x, cate#x]


-- !query
SELECT * FROM (SELECT cate, val, row_number() OVER(PARTITION BY cate ORDER BY val) as r FROM testData) where r <= 2
-- !query analysis
Project [cate#x, val#x, r#x]
+- Filter (r#x <= 2)
+- SubqueryAlias __auto_generated_subquery_name
+- Project [cate#x, val#x, r#x]
+- Project [cate#x, val#x, r#x, r#x]
+- Window [row_number() windowspecdefinition(cate#x, val#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS r#x], [cate#x], [val#x ASC NULLS FIRST]
+- Project [cate#x, val#x]
+- SubqueryAlias testdata
+- View (`testData`, [val#x,val_long#xL,val_double#x,val_date#x,val_timestamp#x,cate#x])
+- Project [cast(val#x as int) AS val#x, cast(val_long#xL as bigint) AS val_long#xL, cast(val_double#x as double) AS val_double#x, cast(val_date#x as date) AS val_date#x, cast(val_timestamp#x as timestamp) AS val_timestamp#x, cast(cate#x as string) AS cate#x]
+- Project [val#x, val_long#xL, val_double#x, val_date#x, val_timestamp#x, cate#x]
+- SubqueryAlias testData
+- LocalRelation [val#x, val_long#xL, val_double#x, val_date#x, val_timestamp#x, cate#x]
13 changes: 13 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/window.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
--CONFIG_DIM1 spark.sql.codegen.wholeStage=true
--CONFIG_DIM1 spark.sql.codegen.wholeStage=false,spark.sql.codegen.factoryMode=CODEGEN_ONLY
--CONFIG_DIM1 spark.sql.codegen.wholeStage=false,spark.sql.codegen.factoryMode=NO_CODEGEN
--CONFIG_DIM2 spark.sql.optimizer.windowGroupLimitThreshold=-1
--CONFIG_DIM2 spark.sql.optimizer.windowGroupLimitThreshold=1000

-- Test data.
CREATE OR REPLACE TEMPORARY VIEW testData AS SELECT * FROM VALUES
Expand Down Expand Up @@ -465,3 +467,14 @@ SELECT
SUM(salary) OVER w sum_salary
FROM
basic_pays;

-- Test cases for InferWindowGroupLimit
create or replace temp view t1 (p, o) as values (1, 1), (1, 1), (1, 2), (2, 1), (2, 1), (2, 2);
select * from (select *, dense_rank() over (partition by p order by o) as rnk from t1) where rnk = 1;

SELECT * FROM (SELECT cate, val, rank() OVER(PARTITION BY cate ORDER BY val) as r FROM testData) where r = 1;
SELECT * FROM (SELECT cate, val, rank() OVER(PARTITION BY cate ORDER BY val) as r FROM testData) where r <= 2;
SELECT * FROM (SELECT cate, val, dense_rank() OVER(PARTITION BY cate ORDER BY val) as r FROM testData) where r = 1;
SELECT * FROM (SELECT cate, val, dense_rank() OVER(PARTITION BY cate ORDER BY val) as r FROM testData) where r <= 2;
SELECT * FROM (SELECT cate, val, row_number() OVER(PARTITION BY cate ORDER BY val) as r FROM testData) where r = 1;
SELECT * FROM (SELECT cate, val, row_number() OVER(PARTITION BY cate ORDER BY val) as r FROM testData) where r <= 2;
90 changes: 90 additions & 0 deletions sql/core/src/test/resources/sql-tests/results/window.sql.out
Original file line number Diff line number Diff line change
Expand Up @@ -1342,3 +1342,93 @@ org.apache.spark.sql.AnalysisException
"windowName" : "w"
}
}


-- !query
create or replace temp view t1 (p, o) as values (1, 1), (1, 1), (1, 2), (2, 1), (2, 1), (2, 2)
-- !query schema
struct<>
-- !query output



-- !query
select * from (select *, dense_rank() over (partition by p order by o) as rnk from t1) where rnk = 1
-- !query schema
struct<p:int,o:int,rnk:int>
-- !query output
1 1 1
1 1 1
2 1 1
2 1 1


-- !query
SELECT * FROM (SELECT cate, val, rank() OVER(PARTITION BY cate ORDER BY val) as r FROM testData) where r = 1
-- !query schema
struct<cate:string,val:int,r:int>
-- !query output
NULL NULL 1
a NULL 1
b 1 1


-- !query
SELECT * FROM (SELECT cate, val, rank() OVER(PARTITION BY cate ORDER BY val) as r FROM testData) where r <= 2
-- !query schema
struct<cate:string,val:int,r:int>
-- !query output
NULL 3 2
NULL NULL 1
a 1 2
a 1 2
a NULL 1
b 1 1
b 2 2


-- !query
SELECT * FROM (SELECT cate, val, dense_rank() OVER(PARTITION BY cate ORDER BY val) as r FROM testData) where r = 1
-- !query schema
struct<cate:string,val:int,r:int>
-- !query output
NULL NULL 1
a NULL 1
b 1 1


-- !query
SELECT * FROM (SELECT cate, val, dense_rank() OVER(PARTITION BY cate ORDER BY val) as r FROM testData) where r <= 2
-- !query schema
struct<cate:string,val:int,r:int>
-- !query output
NULL 3 2
NULL NULL 1
a 1 2
a 1 2
a NULL 1
b 1 1
b 2 2


-- !query
SELECT * FROM (SELECT cate, val, row_number() OVER(PARTITION BY cate ORDER BY val) as r FROM testData) where r = 1
-- !query schema
struct<cate:string,val:int,r:int>
-- !query output
NULL NULL 1
a NULL 1
b 1 1


-- !query
SELECT * FROM (SELECT cate, val, row_number() OVER(PARTITION BY cate ORDER BY val) as r FROM testData) where r <= 2
-- !query schema
struct<cate:string,val:int,r:int>
-- !query output
NULL 3 2
NULL NULL 1
a 1 2
a NULL 1
b 1 1
b 2 2
Loading

0 comments on commit f35c814

Please sign in to comment.