Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] TpchLike query 2 fails when AQE is enabled #275

Closed
andygrove opened this issue Jun 24, 2020 · 7 comments · Fixed by #462
Closed

[BUG] TpchLike query 2 fails when AQE is enabled #275

andygrove opened this issue Jun 24, 2020 · 7 comments · Fixed by #462
Assignees
Labels
bug Something isn't working

Comments

@andygrove
Copy link
Contributor

andygrove commented Jun 24, 2020

Describe the bug
TpchLike query 2 fails when AQE is enabled

Steps/Code to reproduce bug
TpchLike query 2 is currently not run when AQE is enabled. Simply remove the conditional code in this scala test that disables it, and modify the Python test to run with both AQE and non-AQE configs just like all the other tests.

Expected behavior
The test should execute without error and produce the correct row count.

Environment details (please complete the following information)
Running mvn clean verify on desktop (Ubuntu).

Additional context
N/A

@andygrove andygrove added bug Something isn't working ? - Needs Triage Need team to review and classify labels Jun 24, 2020
@andygrove andygrove self-assigned this Jun 24, 2020
@sameerz sameerz removed the ? - Needs Triage Need team to review and classify label Jun 29, 2020
@andygrove andygrove added this to the Aug 3 - Aug 14 milestone Aug 5, 2020
@andygrove
Copy link
Contributor Author

This is the error that happens, sometimes. Working theory is that there is a race condition driven by the order that concurrent query stages finish.

Internal Error class com.nvidia.spark.rapids.shims.spark301.GpuBroadcastExchangeExec has column support mismatch:
GpuBroadcastExchange HashedRelationBroadcastMode(List(knownfloatingpointnormalized(normalizenanandzero(input[0, double, false])), input[1, bigint, true])), [id=#1832]
+- GpuCoalesceBatches TargetSize(2147483647)
   +- GpuFilter gpuisnotnull(min(ps_supplycost)#196)
      +- GpuHashAggregate(keys=[ps_partkey#92L], functions=[gpumin(ps_supplycost#95)], output=[min(ps_supplycost)#196, ps_partkey#92L#216L])
         +- GpuCoalesceBatches TargetSize(9223372036854775807)
            +- GpuCustomShuffleReader coalesced
               +- ShuffleQueryStage 7
                  +- GpuColumnarExchange gpuhashpartitioning(ps_partkey#92L, 2), true, [id=#1671]
                     +- GpuHashAggregate(keys=[ps_partkey#92L], functions=[partial_gpumin(ps_supplycost#95)], output=[ps_partkey#92L, min#220])
                        +- GpuProject [ps_partkey#92L, ps_supplycost#95]
                           +- GpuBroadcastHashJoin [n_regionkey#68L], [r_regionkey#102L], Inner, BuildRight
                              :- GpuProject [ps_partkey#92L, ps_supplycost#95, n_regionkey#68L]
                              :  +- GpuBroadcastHashJoin [s_nationkey#111L], [n_nationkey#66L], Inner, BuildRight
                              :     :- GpuProject [ps_partkey#92L, ps_supplycost#95, s_nationkey#111L]
                              :     :  +- GpuBroadcastHashJoin [ps_suppkey#93L], [s_suppkey#108L], Inner, BuildRight
                              :     :     :- GpuProject [ps_partkey#92L, ps_suppkey#93L, ps_supplycost#95]
                              :     :     :  +- GpuCoalesceBatches TargetSize(2147483647)
                              :     :     :     +- GpuFilter (gpuisnotnull(ps_suppkey#93L) AND gpuisnotnull(ps_partkey#92L))
                              :     :     :        +- GpuFileGpuScan parquet [ps_partkey#92L,ps_suppkey#93L,ps_supplycost#95] Batched: true, DataFilters: [isnotnull(ps_suppkey#93L), isnotnull(ps_partkey#92L)], Format: Parquet, Location: InMemoryFileIndex[file:/home/andygrove/git/spark-rapids/integration_tests/src/test/resources/tpch..., PartitionFilters: [], PushedFilters: [IsNotNull(ps_suppkey), IsNotNull(ps_partkey)], ReadSchema: struct<ps_partkey:bigint,ps_suppkey:bigint,ps_supplycost:double>
                              :     :     +- BroadcastQueryStage 1
                              :     :        +- GpuBroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#618]
                              :     :           +- GpuProject [s_suppkey#108L, s_nationkey#111L]
                              :     :              +- GpuCoalesceBatches TargetSize(2147483647)
                              :     :                 +- GpuFilter (gpuisnotnull(s_suppkey#108L) AND gpuisnotnull(s_nationkey#111L))
                              :     :                    +- GpuFileGpuScan parquet [s_suppkey#108L,s_nationkey#111L] Batched: true, DataFilters: [isnotnull(s_suppkey#108L), isnotnull(s_nationkey#111L)], Format: Parquet, Location: InMemoryFileIndex[file:/home/andygrove/git/spark-rapids/integration_tests/src/test/resources/tpch..., PartitionFilters: [], PushedFilters: [IsNotNull(s_suppkey), IsNotNull(s_nationkey)], ReadSchema: struct<s_suppkey:bigint,s_nationkey:bigint>
                              :     +- BroadcastQueryStage 2
                              :        +- GpuBroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#634]
                              :           +- GpuProject [n_nationkey#66L, n_regionkey#68L]
                              :              +- GpuCoalesceBatches TargetSize(2147483647)
                              :                 +- GpuFilter (gpuisnotnull(n_nationkey#66L) AND gpuisnotnull(n_regionkey#68L))
                              :                    +- GpuFileGpuScan parquet [n_nationkey#66L,n_regionkey#68L] Batched: true, DataFilters: [isnotnull(n_nationkey#66L), isnotnull(n_regionkey#68L)], Format: Parquet, Location: InMemoryFileIndex[file:/home/andygrove/git/spark-rapids/integration_tests/src/test/resources/tpch..., PartitionFilters: [], PushedFilters: [IsNotNull(n_nationkey), IsNotNull(n_regionkey)], ReadSchema: struct<n_nationkey:bigint,n_regionkey:bigint>
                              +- BroadcastQueryStage 3
                                 +- GpuBroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#650]
                                    +- GpuProject [r_regionkey#102L]
                                       +- GpuCoalesceBatches TargetSize(2147483647)
                                          +- GpuFilter ((gpuisnotnull(r_name#103) AND (r_name#103 = EUROPE)) AND gpuisnotnull(r_regionkey#102L))
                                             +- GpuFileGpuScan parquet [r_regionkey#102L,r_name#103] Batched: true, DataFilters: [isnotnull(r_name#103), (r_name#103 = EUROPE), isnotnull(r_regionkey#102L)], Format: Parquet, Location: InMemoryFileIndex[file:/home/andygrove/git/spark-rapids/integration_tests/src/test/resources/tpch..., PartitionFilters: [], PushedFilters: [IsNotNull(r_name), EqualTo(r_name,EUROPE), IsNotNull(r_regionkey)], ReadSchema: struct<r_regionkey:bigint,r_name:string>

java.lang.IllegalStateException: Internal Error class com.nvidia.spark.rapids.shims.spark301.GpuBroadcastExchangeExec has column support mismatch:
GpuBroadcastExchange HashedRelationBroadcastMode(List(knownfloatingpointnormalized(normalizenanandzero(input[0, double, false])), input[1, bigint, true])), [id=#1832]
+- GpuCoalesceBatches TargetSize(2147483647)
   +- GpuFilter gpuisnotnull(min(ps_supplycost)#196)
      +- GpuHashAggregate(keys=[ps_partkey#92L], functions=[gpumin(ps_supplycost#95)], output=[min(ps_supplycost)#196, ps_partkey#92L#216L])
         +- GpuCoalesceBatches TargetSize(9223372036854775807)
            +- GpuCustomShuffleReader coalesced
               +- ShuffleQueryStage 7
                  +- GpuColumnarExchange gpuhashpartitioning(ps_partkey#92L, 2), true, [id=#1671]
                     +- GpuHashAggregate(keys=[ps_partkey#92L], functions=[partial_gpumin(ps_supplycost#95)], output=[ps_partkey#92L, min#220])
                        +- GpuProject [ps_partkey#92L, ps_supplycost#95]
                           +- GpuBroadcastHashJoin [n_regionkey#68L], [r_regionkey#102L], Inner, BuildRight
                              :- GpuProject [ps_partkey#92L, ps_supplycost#95, n_regionkey#68L]
                              :  +- GpuBroadcastHashJoin [s_nationkey#111L], [n_nationkey#66L], Inner, BuildRight
                              :     :- GpuProject [ps_partkey#92L, ps_supplycost#95, s_nationkey#111L]
                              :     :  +- GpuBroadcastHashJoin [ps_suppkey#93L], [s_suppkey#108L], Inner, BuildRight
                              :     :     :- GpuProject [ps_partkey#92L, ps_suppkey#93L, ps_supplycost#95]
                              :     :     :  +- GpuCoalesceBatches TargetSize(2147483647)
                              :     :     :     +- GpuFilter (gpuisnotnull(ps_suppkey#93L) AND gpuisnotnull(ps_partkey#92L))
                              :     :     :        +- GpuFileGpuScan parquet [ps_partkey#92L,ps_suppkey#93L,ps_supplycost#95] Batched: true, DataFilters: [isnotnull(ps_suppkey#93L), isnotnull(ps_partkey#92L)], Format: Parquet, Location: InMemoryFileIndex[file:/home/andygrove/git/spark-rapids/integration_tests/src/test/resources/tpch..., PartitionFilters: [], PushedFilters: [IsNotNull(ps_suppkey), IsNotNull(ps_partkey)], ReadSchema: struct<ps_partkey:bigint,ps_suppkey:bigint,ps_supplycost:double>
                              :     :     +- BroadcastQueryStage 1
                              :     :        +- GpuBroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#618]
                              :     :           +- GpuProject [s_suppkey#108L, s_nationkey#111L]
                              :     :              +- GpuCoalesceBatches TargetSize(2147483647)
                              :     :                 +- GpuFilter (gpuisnotnull(s_suppkey#108L) AND gpuisnotnull(s_nationkey#111L))
                              :     :                    +- GpuFileGpuScan parquet [s_suppkey#108L,s_nationkey#111L] Batched: true, DataFilters: [isnotnull(s_suppkey#108L), isnotnull(s_nationkey#111L)], Format: Parquet, Location: InMemoryFileIndex[file:/home/andygrove/git/spark-rapids/integration_tests/src/test/resources/tpch..., PartitionFilters: [], PushedFilters: [IsNotNull(s_suppkey), IsNotNull(s_nationkey)], ReadSchema: struct<s_suppkey:bigint,s_nationkey:bigint>
                              :     +- BroadcastQueryStage 2
                              :        +- GpuBroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#634]
                              :           +- GpuProject [n_nationkey#66L, n_regionkey#68L]
                              :              +- GpuCoalesceBatches TargetSize(2147483647)
                              :                 +- GpuFilter (gpuisnotnull(n_nationkey#66L) AND gpuisnotnull(n_regionkey#68L))
                              :                    +- GpuFileGpuScan parquet [n_nationkey#66L,n_regionkey#68L] Batched: true, DataFilters: [isnotnull(n_nationkey#66L), isnotnull(n_regionkey#68L)], Format: Parquet, Location: InMemoryFileIndex[file:/home/andygrove/git/spark-rapids/integration_tests/src/test/resources/tpch..., PartitionFilters: [], PushedFilters: [IsNotNull(n_nationkey), IsNotNull(n_regionkey)], ReadSchema: struct<n_nationkey:bigint,n_regionkey:bigint>
                              +- BroadcastQueryStage 3
                                 +- GpuBroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#650]
                                    +- GpuProject [r_regionkey#102L]
                                       +- GpuCoalesceBatches TargetSize(2147483647)
                                          +- GpuFilter ((gpuisnotnull(r_name#103) AND (r_name#103 = EUROPE)) AND gpuisnotnull(r_regionkey#102L))
                                             +- GpuFileGpuScan parquet [r_regionkey#102L,r_name#103] Batched: true, DataFilters: [isnotnull(r_name#103), (r_name#103 = EUROPE), isnotnull(r_regionkey#102L)], Format: Parquet, Location: InMemoryFileIndex[file:/home/andygrove/git/spark-rapids/integration_tests/src/test/resources/tpch..., PartitionFilters: [], PushedFilters: [IsNotNull(r_name), EqualTo(r_name,EUROPE), IsNotNull(r_regionkey)], ReadSchema: struct<r_regionkey:bigint,r_name:string>

	at org.apache.spark.sql.execution.SparkPlan.doExecuteColumnar(SparkPlan.scala:303)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:202)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
	at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:198)
	at org.apache.spark.sql.execution.adaptive.QueryStageExec.doExecuteColumnar(QueryStageExec.scala:117)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:202)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
	at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:198)
	at org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExecBase$$anon$1.$anonfun$call$2(GpuBroadcastExchangeExec.scala:336)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withExecutionId$1(SQLExecution.scala:136)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:134)
	at org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExecBase$$anon$1.call(GpuBroadcastExchangeExec.scala:321)
	at org.apache.spark.sql.rapids.execution.GpuBroadcastExchangeExecBase$$anon$1.call(GpuBroadcastExchangeExec.scala:302)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

@andygrove
Copy link
Contributor Author

By controlling the order that stages finish, I can now reproduce this and we get this error in the case of a plan with a broadcast exchange wrapping a broadcast query stage:

GpuBroadcastExchange HashedRelationBroadcastMode(List(knownfloatingpointnormalized(normalizenanandzero(input[0, double, true])), input[1, bigint, true])), [id=#1802]
+- BroadcastQueryStage 8
   +- GpuBroadcastExchange HashedRelationBroadcastMode(List(knownfloatingpointnormalized(normalizenanandzero(input[0, double, false])), input[1, bigint, true])), [id=#1692]

@andygrove
Copy link
Contributor Author

andygrove commented Aug 6, 2020

The issue is that there is a difference in nullability of a referenced field between the required child partitioning and the actual child partitioning (input[0, double, true] vs input[0, double, false]) and this causes an additional broadcast exchange to be inserted between the join and the broadcast query stage. I am still working on understanding why this is happening.

The attribute being referenced here is the result of a MIN aggregate function.

@andygrove
Copy link
Contributor Author

@jlowe @revans2 I was wondering if you might have any insights on what might cause this?

@revans2
Copy link
Collaborator

revans2 commented Aug 7, 2020

It is likely a difference in our code vs the original code. We replace operators in the physical plan after most of the planning has completed. SparkPlan and Expression both have a lot of APIs in them that are for this planning, but because we are so late in the process, they are never called on our versions of the plan (at least without AQE on). My guess is that we have a bug in one of these and it is showing up here. I would look at output for the different parts of the plan. Perhaps we can insert in an assertion or two to verify that when we replace part of the plan that the output for it matches what is expected.

@andygrove
Copy link
Contributor Author

Thanks for the pointer @revans2. I tracked this down to some bugs in GpuFilterExec. I will write up a separate issue for that.

@andygrove
Copy link
Contributor Author

Root cause: #525

pxLi pushed a commit to pxLi/spark-rapids that referenced this issue May 12, 2022
tgravescs pushed a commit to tgravescs/spark-rapids that referenced this issue Nov 30, 2023
Signed-off-by: spark-rapids automation <70000568+nvauto@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants