-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-34796][SQL][3.1] Initialize counter variable for LIMIT code-gen in doProduce() #31911
Conversation
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #136298 has finished for PR 31911 at commit
|
retest this please |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #136302 has finished for PR 31911 at commit
|
…n in doProduce() ### What changes were proposed in this pull request? This PR is to fix the LIMIT code-gen bug in https://issues.apache.org/jira/browse/SPARK-34796, where the counter variable from `BaseLimitExec` is not initialized but used in code-gen. This is because the limit counter variable will be used in upstream operators (LIMIT's child plan, e.g. `ColumnarToRowExec` operator for early termination), but in the same stage, there can be some operators doing the shortcut and not calling `BaseLimitExec`'s `doConsume()`, e.g. [HashJoin.codegenInner](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala#L402). So if we have query that `LocalLimit - BroadcastHashJoin - FileScan` in the same stage, the whole stage code-gen compilation will be failed. Here is an example: ``` test("failed limit query") { withTable("left_table", "empty_right_table", "output_table") { spark.range(5).toDF("k").write.saveAsTable("left_table") spark.range(0).toDF("k").write.saveAsTable("empty_right_table") withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { spark.sql("CREATE TABLE output_table (k INT) USING parquet") spark.sql( s""" |INSERT INTO TABLE output_table |SELECT t1.k FROM left_table t1 |JOIN empty_right_table t2 |ON t1.k = t2.k |LIMIT 3 |""".stripMargin) } } } ``` Query plan: ``` Execute InsertIntoHadoopFsRelationCommand file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sql.SQLQuerySuite/output_table, false, Parquet, Map(path -> file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sql.SQLQuerySuite/output_table), Append, CatalogTable( Database: default Table: output_table Created Time: Thu Mar 18 21:46:26 PDT 2021 Last Access: UNKNOWN Created By: Spark 3.2.0-SNAPSHOT Type: MANAGED Provider: parquet Location: file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sql.SQLQuerySuite/output_table Schema: root |-- k: integer (nullable = true) ), org.apache.spark.sql.execution.datasources.InMemoryFileIndexb25d08b, [k] +- *(3) Project [ansi_cast(k#228L as int) AS k#231] +- *(3) GlobalLimit 3 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#179] +- *(2) LocalLimit 3 +- *(2) Project [k#228L] +- *(2) BroadcastHashJoin [k#228L], [k#229L], Inner, BuildRight, false :- *(2) Filter isnotnull(k#228L) : +- *(2) ColumnarToRow : +- FileScan parquet default.left_table[k#228L] Batched: true, DataFilters: [isnotnull(k#228L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sq..., PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:bigint> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#173] +- *(1) Filter isnotnull(k#229L) +- *(1) ColumnarToRow +- FileScan parquet default.empty_right_table[k#229L] Batched: true, DataFilters: [isnotnull(k#229L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sq..., PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:bigint> ``` Codegen failure - https://gist.github.com/c21/ea760c75b546d903247582be656d9d66 . The uninitialized variable `_limit_counter_1` from `LocalLimitExec` is referenced in `ColumnarToRowExec`, but `BroadcastHashJoinExec` does not call `LocalLimitExec.doConsume()` to initialize the counter variable. The fix is to move the counter variable initialization to `doProduce()`, as in whole stage code-gen framework, `doProduce()` will definitely be called if upstream operators `doProduce()`/`doConsume()` is called. Note: this only happens in AQE disabled case, because we have an AQE optimization rule [EliminateUnnecessaryJoin](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateUnnecessaryJoin.scala#L69) to change the whole query to an empty `LocalRelation` if inner join broadcast side is empty with AQE enabled. ### Why are the changes needed? Fix query failure. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added unit test in `SQLQuerySuite.scala`. Closes #31911 from c21/limit-fix-3.1. Authored-by: Cheng Su <chengsu@fb.com> Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
Thanks! Merged to branch-3.1. |
Thank you @maropu for review! |
…n in doProduce() This PR is to fix the LIMIT code-gen bug in https://issues.apache.org/jira/browse/SPARK-34796, where the counter variable from `BaseLimitExec` is not initialized but used in code-gen. This is because the limit counter variable will be used in upstream operators (LIMIT's child plan, e.g. `ColumnarToRowExec` operator for early termination), but in the same stage, there can be some operators doing the shortcut and not calling `BaseLimitExec`'s `doConsume()`, e.g. [HashJoin.codegenInner](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala#L402). So if we have query that `LocalLimit - BroadcastHashJoin - FileScan` in the same stage, the whole stage code-gen compilation will be failed. Here is an example: ``` test("failed limit query") { withTable("left_table", "empty_right_table", "output_table") { spark.range(5).toDF("k").write.saveAsTable("left_table") spark.range(0).toDF("k").write.saveAsTable("empty_right_table") withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { spark.sql("CREATE TABLE output_table (k INT) USING parquet") spark.sql( s""" |INSERT INTO TABLE output_table |SELECT t1.k FROM left_table t1 |JOIN empty_right_table t2 |ON t1.k = t2.k |LIMIT 3 |""".stripMargin) } } } ``` Query plan: ``` Execute InsertIntoHadoopFsRelationCommand file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sql.SQLQuerySuite/output_table, false, Parquet, Map(path -> file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sql.SQLQuerySuite/output_table), Append, CatalogTable( Database: default Table: output_table Created Time: Thu Mar 18 21:46:26 PDT 2021 Last Access: UNKNOWN Created By: Spark 3.2.0-SNAPSHOT Type: MANAGED Provider: parquet Location: file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sql.SQLQuerySuite/output_table Schema: root |-- k: integer (nullable = true) ), org.apache.spark.sql.execution.datasources.InMemoryFileIndexb25d08b, [k] +- *(3) Project [ansi_cast(k#228L as int) AS k#231] +- *(3) GlobalLimit 3 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=apache#179] +- *(2) LocalLimit 3 +- *(2) Project [k#228L] +- *(2) BroadcastHashJoin [k#228L], [k#229L], Inner, BuildRight, false :- *(2) Filter isnotnull(k#228L) : +- *(2) ColumnarToRow : +- FileScan parquet default.left_table[k#228L] Batched: true, DataFilters: [isnotnull(k#228L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sq..., PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:bigint> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=apache#173] +- *(1) Filter isnotnull(k#229L) +- *(1) ColumnarToRow +- FileScan parquet default.empty_right_table[k#229L] Batched: true, DataFilters: [isnotnull(k#229L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sq..., PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:bigint> ``` Codegen failure - https://gist.github.com/c21/ea760c75b546d903247582be656d9d66 . The uninitialized variable `_limit_counter_1` from `LocalLimitExec` is referenced in `ColumnarToRowExec`, but `BroadcastHashJoinExec` does not call `LocalLimitExec.doConsume()` to initialize the counter variable. The fix is to move the counter variable initialization to `doProduce()`, as in whole stage code-gen framework, `doProduce()` will definitely be called if upstream operators `doProduce()`/`doConsume()` is called. Note: this only happens in AQE disabled case, because we have an AQE optimization rule [EliminateUnnecessaryJoin](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateUnnecessaryJoin.scala#L69) to change the whole query to an empty `LocalRelation` if inner join broadcast side is empty with AQE enabled. Fix query failure. No. Added unit test in `SQLQuerySuite.scala`. Closes apache#31911 from c21/limit-fix-3.1. Authored-by: Cheng Su <chengsu@fb.com> Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
…n in doProduce() ### What changes were proposed in this pull request? This PR is to fix the LIMIT code-gen bug in https://issues.apache.org/jira/browse/SPARK-34796, where the counter variable from `BaseLimitExec` is not initialized but used in code-gen. This is because the limit counter variable will be used in upstream operators (LIMIT's child plan, e.g. `ColumnarToRowExec` operator for early termination), but in the same stage, there can be some operators doing the shortcut and not calling `BaseLimitExec`'s `doConsume()`, e.g. [HashJoin.codegenInner](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala#L402). So if we have query that `LocalLimit - BroadcastHashJoin - FileScan` in the same stage, the whole stage code-gen compilation will be failed. Here is an example: ``` test("failed limit query") { withTable("left_table", "empty_right_table", "output_table") { spark.range(5).toDF("k").write.saveAsTable("left_table") spark.range(0).toDF("k").write.saveAsTable("empty_right_table") withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { spark.sql("CREATE TABLE output_table (k INT) USING parquet") spark.sql( s""" |INSERT INTO TABLE output_table |SELECT t1.k FROM left_table t1 |JOIN empty_right_table t2 |ON t1.k = t2.k |LIMIT 3 |""".stripMargin) } } } ``` Query plan: ``` Execute InsertIntoHadoopFsRelationCommand file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sql.SQLQuerySuite/output_table, false, Parquet, Map(path -> file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sql.SQLQuerySuite/output_table), Append, CatalogTable( Database: default Table: output_table Created Time: Thu Mar 18 21:46:26 PDT 2021 Last Access: UNKNOWN Created By: Spark 3.2.0-SNAPSHOT Type: MANAGED Provider: parquet Location: file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sql.SQLQuerySuite/output_table Schema: root |-- k: integer (nullable = true) ), org.apache.spark.sql.execution.datasources.InMemoryFileIndexb25d08b, [k] +- *(3) Project [ansi_cast(k#228L as int) AS k#231] +- *(3) GlobalLimit 3 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=apache#179] +- *(2) LocalLimit 3 +- *(2) Project [k#228L] +- *(2) BroadcastHashJoin [k#228L], [k#229L], Inner, BuildRight, false :- *(2) Filter isnotnull(k#228L) : +- *(2) ColumnarToRow : +- FileScan parquet default.left_table[k#228L] Batched: true, DataFilters: [isnotnull(k#228L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sq..., PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:bigint> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=apache#173] +- *(1) Filter isnotnull(k#229L) +- *(1) ColumnarToRow +- FileScan parquet default.empty_right_table[k#229L] Batched: true, DataFilters: [isnotnull(k#229L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sq..., PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:bigint> ``` Codegen failure - https://gist.github.com/c21/ea760c75b546d903247582be656d9d66 . The uninitialized variable `_limit_counter_1` from `LocalLimitExec` is referenced in `ColumnarToRowExec`, but `BroadcastHashJoinExec` does not call `LocalLimitExec.doConsume()` to initialize the counter variable. The fix is to move the counter variable initialization to `doProduce()`, as in whole stage code-gen framework, `doProduce()` will definitely be called if upstream operators `doProduce()`/`doConsume()` is called. Note: this only happens in AQE disabled case, because we have an AQE optimization rule [EliminateUnnecessaryJoin](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateUnnecessaryJoin.scala#L69) to change the whole query to an empty `LocalRelation` if inner join broadcast side is empty with AQE enabled. ### Why are the changes needed? Fix query failure. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added unit test in `SQLQuerySuite.scala`. Closes apache#31911 from c21/limit-fix-3.1. Authored-by: Cheng Su <chengsu@fb.com> Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
What changes were proposed in this pull request?
This PR is to fix the LIMIT code-gen bug in https://issues.apache.org/jira/browse/SPARK-34796, where the counter variable from
BaseLimitExec
is not initialized but used in code-gen. This is because the limit counter variable will be used in upstream operators (LIMIT's child plan, e.g.ColumnarToRowExec
operator for early termination), but in the same stage, there can be some operators doing the shortcut and not callingBaseLimitExec
'sdoConsume()
, e.g. HashJoin.codegenInner. So if we have query thatLocalLimit - BroadcastHashJoin - FileScan
in the same stage, the whole stage code-gen compilation will be failed.Here is an example:
Query plan:
Codegen failure - https://gist.github.com/c21/ea760c75b546d903247582be656d9d66 .
The uninitialized variable
_limit_counter_1
fromLocalLimitExec
is referenced inColumnarToRowExec
, butBroadcastHashJoinExec
does not callLocalLimitExec.doConsume()
to initialize the counter variable.The fix is to move the counter variable initialization to
doProduce()
, as in whole stage code-gen framework,doProduce()
will definitely be called if upstream operatorsdoProduce()
/doConsume()
is called.Note: this only happens in AQE disabled case, because we have an AQE optimization rule EliminateUnnecessaryJoin to change the whole query to an empty
LocalRelation
if inner join broadcast side is empty with AQE enabled.Why are the changes needed?
Fix query failure.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Added unit test in
SQLQuerySuite.scala
.