Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] TPC-DS query 90 with AQE enabled fails with doExecuteBroadcast exception #1035

Closed
chenrui17 opened this issue Oct 28, 2020 · 3 comments · Fixed by #1042
Closed

[BUG] TPC-DS query 90 with AQE enabled fails with doExecuteBroadcast exception #1035

chenrui17 opened this issue Oct 28, 2020 · 3 comments · Fixed by #1042
Assignees
Labels
bug Something isn't working P0 Must have for release

Comments

@chenrui17
Copy link

Describe the bug
tpc-ds-Q90_aqe_on.txt

Steps/Code to reproduce bug
tpc-ds query-90 with AQE on
my sf = 1000
rapids is branch-0.3 with latest version
spark-3.0.1-rc3

Expected behavior
execution pass

@chenrui17 chenrui17 added ? - Needs Triage Need team to review and classify bug Something isn't working labels Oct 28, 2020
@chenrui17 chenrui17 changed the title UnsupportedOperationException: WholeStageCodegen (2) does not implement doExecuteBroadcast when run tpc-ds query-90 with AQE[BUG] UnsupportedOperationException: WholeStageCodegen (2) does not implement doExecuteBroadcast when run tpc-ds query-90 with AQE ON [BUG] Oct 28, 2020
@jlowe
Copy link
Member

jlowe commented Oct 28, 2020

Pasting the relative exception from the log for reference.

20/10/28 17:21:14 ERROR execution.datasources.FileFormatWriter [pool-2-thread-1]: Aborting job c7d00a2e-e4ea-4c25-92b4-09f4e8aa3e0f.
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
ShuffleQueryStage 9
+- Exchange RoundRobinPartitioning(208), false, [id=#1450]
   +- TakeOrderedAndProject(limit=100, orderBy=[am_pm_ratio#2 ASC NULLS FIRST], output=[am_pm_ratio#2])
      +- *(3) Project [CheckOverflow((promote_precision(cast(amc#0L as decimal(15,4))) / promote_precision(cast(pmc#1L as decimal(15,4)))), DecimalType(35,20), true) AS am_pm_ratio#2]
         +- BroadcastNestedLoopJoin BuildRight, Inner
            :- *(1) GpuColumnarToRow false
            :  +- GpuHashAggregate(keys=[], functions=[gpucount(1)], output=[amc#0L])
            :     +- ShuffleQueryStage 6
            :        +- GpuColumnarExchange gpusinglepartitioning(), false, [id=#1208]
            :           +- GpuHashAggregate(keys=[], functions=[partial_gpucount(1)], output=[count#70L])
            :              +- GpuProject
            :                 +- GpuBroadcastHashJoin [ws_web_page_sk#16], [wp_web_page_sk#54], Inner, BuildRight
            :                    :- GpuProject [ws_web_page_sk#16]
            :                    :  +- GpuBroadcastHashJoin [ws_sold_time_sk#5], [t_time_sk#44], Inner, BuildRight
            :                    :     :- GpuProject [ws_sold_time_sk#5, ws_web_page_sk#16]
            :                    :     :  +- GpuBroadcastHashJoin [ws_ship_hdemo_sk#14], [hd_demo_sk#39], Inner, BuildRight
            :                    :     :     :- GpuProject [ws_sold_time_sk#5, ws_ship_hdemo_sk#14, ws_web_page_sk#16]
            :                    :     :     :  +- GpuCoalesceBatches TargetSize(2147483648)
            :                    :     :     :     +- GpuFilter ((gpuisnotnull(ws_ship_hdemo_sk#14) AND gpuisnotnull(ws_sold_time_sk#5)) AND gpuisnotnull(ws_web_page_sk#16))
            :                    :     :     :        +- GpuFileGpuScan parquet default.web_sales[ws_sold_time_sk#5,ws_ship_hdemo_sk#14,ws_web_page_sk#16,ws_sold_date_sk#38] Batched: true, DataFilters: [isnotnull(ws_ship_hdemo_sk#14), isnotnull(ws_sold_time_sk#5), isnotnull(ws_web_page_sk#16)], Format: Parquet, Location: CatalogFileIndex[hdfs://instance-p3x8adaq-01:9000/user/work/data_10T_gpu/web_sales], PartitionFilters: [], PushedFilters: [IsNotNull(ws_ship_hdemo_sk), IsNotNull(ws_sold_time_sk), IsNotNull(ws_web_page_sk)], ReadSchema: struct<ws_sold_time_sk:int,ws_ship_hdemo_sk:int,ws_web_page_sk:int>
            :                    :     :     +- BroadcastQueryStage 0
            :                    :     :        +- GpuBroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#554]
            :                    :     :           +- GpuProject [hd_demo_sk#39]
            :                    :     :              +- GpuCoalesceBatches TargetSize(2147483648)
            :                    :     :                 +- GpuFilter ((gpuisnotnull(hd_dep_count#42L) AND (hd_dep_count#42L = 4)) AND gpuisnotnull(hd_demo_sk#39))
            :                    :     :                    +- GpuFileGpuScan parquet default.household_demographics[hd_demo_sk#39,hd_dep_count#42L] Batched: true, DataFilters: [isnotnull(hd_dep_count#42L), (hd_dep_count#42L = 4), isnotnull(hd_demo_sk#39)], Format: Parquet, Location: InMemoryFileIndex[hdfs://instance-p3x8adaq-01:9000/user/work/data_10T_gpu/household_demographics], PartitionFilters: [], PushedFilters: [IsNotNull(hd_dep_count), EqualTo(hd_dep_count,4), IsNotNull(hd_demo_sk)], ReadSchema: struct<hd_demo_sk:int,hd_dep_count:bigint>
            :                    :     +- BroadcastQueryStage 1
            :                    :        +- GpuBroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#569]
            :                    :           +- GpuProject [t_time_sk#44]
            :                    :              +- GpuCoalesceBatches TargetSize(2147483648)
            :                    :                 +- GpuFilter (((gpuisnotnull(t_hour#47L) AND (t_hour#47L >= 6)) AND (t_hour#47L <= 7)) AND gpuisnotnull(t_time_sk#44))
            :                    :                    +- GpuFileGpuScan parquet default.time_dim[t_time_sk#44,t_hour#47L] Batched: true, DataFilters: [isnotnull(t_hour#47L), (t_hour#47L >= 6), (t_hour#47L <= 7), isnotnull(t_time_sk#44)], Format: Parquet, Location: InMemoryFileIndex[hdfs://instance-p3x8adaq-01:9000/user/work/data_10T_gpu/time_dim], PartitionFilters: [], PushedFilters: [IsNotNull(t_hour), GreaterThanOrEqual(t_hour,6), LessThanOrEqual(t_hour,7), IsNotNull(t_time_sk)], ReadSchema: struct<t_time_sk:int,t_hour:bigint>
            :                    +- BroadcastQueryStage 2
            :                       +- GpuBroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#584]
            :                          +- GpuProject [wp_web_page_sk#54]
            :                             +- GpuCoalesceBatches TargetSize(2147483648)
            :                                +- GpuFilter (((gpuisnotnull(wp_char_count#64L) AND (wp_char_count#64L >= 5000)) AND (wp_char_count#64L <= 5200)) AND gpuisnotnull(wp_web_page_sk#54))
            :                                   +- GpuFileGpuScan parquet default.web_page[wp_web_page_sk#54,wp_char_count#64L] Batched: true, DataFilters: [isnotnull(wp_char_count#64L), (wp_char_count#64L >= 5000), (wp_char_count#64L <= 5200), isnotnul..., Format: Parquet, Location: InMemoryFileIndex[hdfs://instance-p3x8adaq-01:9000/user/work/data_10T_gpu/web_page], PartitionFilters: [], PushedFilters: [IsNotNull(wp_char_count), GreaterThanOrEqual(wp_char_count,5000), LessThanOrEqual(wp_char_count,..., ReadSchema: struct<wp_web_page_sk:int,wp_char_count:bigint>
            +- *(2) GpuColumnarToRow false
               +- BroadcastQueryStage 8
                  +- GpuBroadcastExchange IdentityBroadcastMode, [id=#1378]
                     +- GpuHashAggregate(keys=[], functions=[gpucount(1)], output=[pmc#1L])
                        +- ShuffleQueryStage 7
                           +- GpuColumnarExchange gpusinglepartitioning(), false, [id=#1256]
                              +- GpuHashAggregate(keys=[], functions=[partial_gpucount(1)], output=[count#72L])
                                 +- GpuProject
                                    +- GpuBroadcastHashJoin [ws_web_page_sk#16], [wp_web_page_sk#54], Inner, BuildRight
                                       :- GpuProject [ws_web_page_sk#16]
                                       :  +- GpuBroadcastHashJoin [ws_sold_time_sk#5], [t_time_sk#44], Inner, BuildRight
                                       :     :- GpuProject [ws_sold_time_sk#5, ws_web_page_sk#16]
                                       :     :  +- GpuBroadcastHashJoin [ws_ship_hdemo_sk#14], [hd_demo_sk#39], Inner, BuildRight
                                       :     :     :- GpuProject [ws_sold_time_sk#5, ws_ship_hdemo_sk#14, ws_web_page_sk#16]
                                       :     :     :  +- GpuCoalesceBatches TargetSize(2147483648)
                                       :     :     :     +- GpuFilter ((gpuisnotnull(ws_ship_hdemo_sk#14) AND gpuisnotnull(ws_sold_time_sk#5)) AND gpuisnotnull(ws_web_page_sk#16))
                                       :     :     :        +- GpuFileGpuScan parquet default.web_sales[ws_sold_time_sk#5,ws_ship_hdemo_sk#14,ws_web_page_sk#16,ws_sold_date_sk#38] Batched: true, DataFilters: [isnotnull(ws_ship_hdemo_sk#14), isnotnull(ws_sold_time_sk#5), isnotnull(ws_web_page_sk#16)], Format: Parquet, Location: CatalogFileIndex[hdfs://instance-p3x8adaq-01:9000/user/work/data_10T_gpu/web_sales], PartitionFilters: [], PushedFilters: [IsNotNull(ws_ship_hdemo_sk), IsNotNull(ws_sold_time_sk), IsNotNull(ws_web_page_sk)], ReadSchema: struct<ws_sold_time_sk:int,ws_ship_hdemo_sk:int,ws_web_page_sk:int>
                                       :     :     +- BroadcastQueryStage 3
                                       :     :        +- ReusedExchange [hd_demo_sk#39], GpuBroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#554]
                                       :     +- BroadcastQueryStage 4
                                       :        +- GpuBroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#606]
                                       :           +- GpuProject [t_time_sk#44]
                                       :              +- GpuCoalesceBatches TargetSize(2147483648)
                                       :                 +- GpuFilter (((gpuisnotnull(t_hour#47L) AND (t_hour#47L >= 16)) AND (t_hour#47L <= 17)) AND gpuisnotnull(t_time_sk#44))
                                       :                    +- GpuFileGpuScan parquet default.time_dim[t_time_sk#44,t_hour#47L] Batched: true, DataFilters: [isnotnull(t_hour#47L), (t_hour#47L >= 16), (t_hour#47L <= 17), isnotnull(t_time_sk#44)], Format: Parquet, Location: InMemoryFileIndex[hdfs://instance-p3x8adaq-01:9000/user/work/data_10T_gpu/time_dim], PartitionFilters: [], PushedFilters: [IsNotNull(t_hour), GreaterThanOrEqual(t_hour,16), LessThanOrEqual(t_hour,17), IsNotNull(t_time_sk)], ReadSchema: struct<t_time_sk:int,t_hour:bigint>
                                       +- BroadcastQueryStage 5
                                          +- ReusedExchange [wp_web_page_sk#54], GpuBroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#584]

	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
	at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.doMaterialize(QueryStageExec.scala:160)
	at org.apache.spark.sql.execution.adaptive.QueryStageExec.$anonfun$materialize$1(QueryStageExec.scala:79)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
	at org.apache.spark.sql.execution.adaptive.QueryStageExec.materialize(QueryStageExec.scala:79)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$4(AdaptiveSparkPlanExec.scala:175)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$4$adapted(AdaptiveSparkPlanExec.scala:173)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:173)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:159)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.doExecute(AdaptiveSparkPlanExec.scala:273)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:172)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:178)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
	at org.apache.spark.sql.hive.SparkSessionWrapper$.$anonfun$runCommand$1(SparkSessionWrapper.scala:475)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.hive.SparkSessionWrapper$.runCommand(SparkSessionWrapper.scala:475)
	at org.apache.spark.sql.hive.SparkSessionWrapper$.insertIntoTable(SparkSessionWrapper.scala:459)
	at org.apache.spark.sql.hive.SparkSessionWrapper$.saveAsTable(SparkSessionWrapper.scala:251)
	at com.baidu.inf.bbs.worker.runner.QueryJobRunner.$anonfun$sink$1(QueryJobRunner.scala:268)
	at com.baidu.inf.bbs.worker.runner.QueryJobRunner.$anonfun$sink$1$adapted(QueryJobRunner.scala:266)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at com.baidu.inf.bbs.worker.runner.QueryJobRunner.sink(QueryJobRunner.scala:266)
	at com.baidu.inf.bbs.worker.runner.QueryJobRunner.runJob(QueryJobRunner.scala:292)
	at com.baidu.inf.bbs.worker.runner.SparkJobRunner.com$baidu$inf$bbs$worker$runner$SparkJobRunner$$processJob(SparkJobRunner.scala:177)
	at com.baidu.inf.bbs.worker.runner.SparkJobRunner.run(SparkJobRunner.scala:157)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsupportedOperationException: WholeStageCodegen (2) does not implement doExecuteBroadcast
	at org.apache.spark.sql.execution.SparkPlan.doExecuteBroadcast(SparkPlan.scala:293)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeBroadcast$1(SparkPlan.scala:188)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
	at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:184)
	at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.doExecute(BroadcastNestedLoopJoinExec.scala:341)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
	at org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:525)
	at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:453)
	at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:452)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:496)
	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:47)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:720)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
	at org.apache.spark.sql.execution.TakeOrderedAndProjectExec.doExecute(limit.scala:203)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD$lzycompute(ShuffleExchangeExec.scala:106)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD(ShuffleExchangeExec.scala:106)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture$lzycompute(ShuffleExchangeExec.scala:110)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture(ShuffleExchangeExec.scala:109)
	at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.$anonfun$doMaterialize$1(QueryStageExec.scala:160)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
	... 51 more

@jlowe jlowe added P0 Must have for release and removed ? - Needs Triage Need team to review and classify labels Oct 28, 2020
@jlowe jlowe changed the title UnsupportedOperationException: WholeStageCodegen (2) does not implement doExecuteBroadcast when run tpc-ds query-90 with AQE ON [BUG] [BUG] TPC-DS query 90 with AQE enabled fails with doExecuteBroadcast exception Oct 28, 2020
@andygrove andygrove self-assigned this Oct 28, 2020
@andygrove
Copy link
Contributor

I have been able to reproduce this. I will update the issue once I understand the cause.

@andygrove
Copy link
Contributor

The root cause of q90 failing when BroadcastNestedLoopJoin and AQE are enabled was that the BroadcastNestedLoopJoinMeta class was relying on calling the canThisBeReplaced method on the build side of the join and although this works correctly when the build side is BroadcastExchangeExec node, it does not work when the build side is a BroadcastQueryStageExec as is the case when AQE is enabled.

tgravescs pushed a commit to tgravescs/spark-rapids that referenced this issue Nov 30, 2023
…IDIA#1035)

Signed-off-by: spark-rapids automation <70000568+nvauto@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working P0 Must have for release
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants