Skip to content

Commit

Permalink
Fix the incomplete capture of SubqueryBroadcast (#4630)
Browse files Browse the repository at this point in the history
Signed-off-by: sperlingxx <lovedreamf@gmail.com>

Fixes #4625

This PR is to fix the logic on searching/matching potential SubqueryBroadcastExec from partition filters of scans. In previous, we missed a possible condition: As a subquery, SubqueryBroadcastExec may be reused. And this missing will lead to a crash when AQE is on.
  • Loading branch information
sperlingxx authored Jan 27, 2022
1 parent 16d6668 commit 30096e7
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 47 deletions.
21 changes: 20 additions & 1 deletion integration_tests/src/main/python/dpp_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2021, NVIDIA CORPORATION.
# Copyright (c) 2021-2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -100,6 +100,25 @@ def fn(spark):
WHERE dim.filter = {2}
GROUP BY fact.key, fact.skey, fact.ex_key
''',
# This query checks the pattern of reused broadcast subquery: ReusedSubquery(SubqueryBroadcast(...))
# https://github.com/NVIDIA/spark-rapids/issues/4625
"""
SELECT key, max(value)
FROM (
SELECT fact.key as key, fact.value as value
FROM {0} fact
JOIN {1} dim
ON fact.key = dim.key
WHERE dim.filter = {2}
UNION ALL
SELECT fact.key as key, fact.value as value
FROM {0} fact
JOIN {1} dim
ON fact.key = dim.key
WHERE dim.filter = {2}
)
GROUP BY key
"""
]


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,15 +151,24 @@ abstract class Spark30XdbShims extends Spark30XdbShimsBase with Logging {
// FileSourceScan is independent from the replacement of the partitionFilters. It is
// possible that the FileSourceScan is on the CPU, while the dynamic partitionFilters
// are on the GPU. And vice versa.
private lazy val partitionFilters = wrapped.partitionFilters.map { filter =>
filter.transformDown {
case dpe @ DynamicPruningExpression(inSub: InSubqueryExec)
if inSub.plan.isInstanceOf[SubqueryBroadcastExec] =>

val subBcMeta = GpuOverrides.wrapAndTagPlan(inSub.plan, conf)
subBcMeta.tagForExplain()
val gpuSubBroadcast = subBcMeta.convertIfNeeded().asInstanceOf[BaseSubqueryExec]
dpe.copy(inSub.copy(plan = gpuSubBroadcast))
private lazy val partitionFilters = {
val convertBroadcast = (bc: SubqueryBroadcastExec) => {
val meta = GpuOverrides.wrapAndTagPlan(bc, conf)
meta.tagForExplain()
meta.convertIfNeeded().asInstanceOf[BaseSubqueryExec]
}
wrapped.partitionFilters.map { filter =>
filter.transformDown {
case dpe @ DynamicPruningExpression(inSub: InSubqueryExec) =>
inSub.plan match {
case bc: SubqueryBroadcastExec =>
dpe.copy(inSub.copy(plan = convertBroadcast(bc)))
case reuse @ ReusedSubqueryExec(bc: SubqueryBroadcastExec) =>
dpe.copy(inSub.copy(plan = reuse.copy(convertBroadcast(bc))))
case _ =>
dpe
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,15 +130,24 @@ abstract class Spark30XShims extends Spark301until320Shims with Logging {
// FileSourceScan is independent from the replacement of the partitionFilters. It is
// possible that the FileSourceScan is on the CPU, while the dynamic partitionFilters
// are on the GPU. And vice versa.
private lazy val partitionFilters = wrapped.partitionFilters.map { filter =>
filter.transformDown {
case dpe @ DynamicPruningExpression(inSub: InSubqueryExec)
if inSub.plan.isInstanceOf[SubqueryBroadcastExec] =>

val subBcMeta = GpuOverrides.wrapAndTagPlan(inSub.plan, conf)
subBcMeta.tagForExplain()
val gpuSubBroadcast = subBcMeta.convertIfNeeded().asInstanceOf[BaseSubqueryExec]
dpe.copy(inSub.copy(plan = gpuSubBroadcast))
private lazy val partitionFilters = {
val convertBroadcast = (bc: SubqueryBroadcastExec) => {
val meta = GpuOverrides.wrapAndTagPlan(bc, conf)
meta.tagForExplain()
meta.convertIfNeeded().asInstanceOf[BaseSubqueryExec]
}
wrapped.partitionFilters.map { filter =>
filter.transformDown {
case dpe @ DynamicPruningExpression(inSub: InSubqueryExec) =>
inSub.plan match {
case bc: SubqueryBroadcastExec =>
dpe.copy(inSub.copy(plan = convertBroadcast(bc)))
case reuse @ ReusedSubqueryExec(bc: SubqueryBroadcastExec) =>
dpe.copy(inSub.copy(plan = reuse.copy(convertBroadcast(bc))))
case _ =>
dpe
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,15 +370,24 @@ abstract class Spark31XShims extends Spark301until320Shims with Logging {
// FileSourceScan is independent from the replacement of the partitionFilters. It is
// possible that the FileSourceScan is on the CPU, while the dynamic partitionFilters
// are on the GPU. And vice versa.
private lazy val partitionFilters = wrapped.partitionFilters.map { filter =>
filter.transformDown {
case dpe @ DynamicPruningExpression(inSub: InSubqueryExec)
if inSub.plan.isInstanceOf[SubqueryBroadcastExec] =>

val subBcMeta = GpuOverrides.wrapAndTagPlan(inSub.plan, conf)
subBcMeta.tagForExplain()
val gpuSubBroadcast = subBcMeta.convertIfNeeded().asInstanceOf[BaseSubqueryExec]
dpe.copy(inSub.copy(plan = gpuSubBroadcast))
private lazy val partitionFilters = {
val convertBroadcast = (bc: SubqueryBroadcastExec) => {
val meta = GpuOverrides.wrapAndTagPlan(bc, conf)
meta.tagForExplain()
meta.convertIfNeeded().asInstanceOf[BaseSubqueryExec]
}
wrapped.partitionFilters.map { filter =>
filter.transformDown {
case dpe @ DynamicPruningExpression(inSub: InSubqueryExec) =>
inSub.plan match {
case bc: SubqueryBroadcastExec =>
dpe.copy(inSub.copy(plan = convertBroadcast(bc)))
case reuse @ ReusedSubqueryExec(bc: SubqueryBroadcastExec) =>
dpe.copy(inSub.copy(plan = reuse.copy(convertBroadcast(bc))))
case _ =>
dpe
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,15 +382,24 @@ abstract class Spark31XdbShims extends Spark31XdbShimsBase with Logging {
// FileSourceScan is independent from the replacement of the partitionFilters. It is
// possible that the FileSourceScan is on the CPU, while the dynamic partitionFilters
// are on the GPU. And vice versa.
private lazy val partitionFilters = wrapped.partitionFilters.map { filter =>
filter.transformDown {
case dpe @ DynamicPruningExpression(inSub: InSubqueryExec)
if inSub.plan.isInstanceOf[SubqueryBroadcastExec] =>

val subBcMeta = GpuOverrides.wrapAndTagPlan(inSub.plan, conf)
subBcMeta.tagForExplain()
val gpuSubBroadcast = subBcMeta.convertIfNeeded().asInstanceOf[BaseSubqueryExec]
dpe.copy(inSub.copy(plan = gpuSubBroadcast))
private lazy val partitionFilters = {
val convertBroadcast = (bc: SubqueryBroadcastExec) => {
val meta = GpuOverrides.wrapAndTagPlan(bc, conf)
meta.tagForExplain()
meta.convertIfNeeded().asInstanceOf[BaseSubqueryExec]
}
wrapped.partitionFilters.map { filter =>
filter.transformDown {
case dpe @ DynamicPruningExpression(inSub: InSubqueryExec) =>
inSub.plan match {
case bc: SubqueryBroadcastExec =>
dpe.copy(inSub.copy(plan = convertBroadcast(bc)))
case reuse @ ReusedSubqueryExec(bc: SubqueryBroadcastExec) =>
dpe.copy(inSub.copy(plan = reuse.copy(convertBroadcast(bc))))
case _ =>
dpe
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.catalyst.util.DateFormatter
import org.apache.spark.sql.connector.read.{Scan, SupportsRuntimeFiltering}
import org.apache.spark.sql.execution.{BaseSubqueryExec, CommandResultExec, FileSourceScanExec, InSubqueryExec, PartitionedFileUtil, SparkPlan, SubqueryBroadcastExec}
import org.apache.spark.sql.execution.{BaseSubqueryExec, CommandResultExec, FileSourceScanExec, InSubqueryExec, PartitionedFileUtil, ReusedSubqueryExec, SparkPlan, SubqueryBroadcastExec}
import org.apache.spark.sql.execution.adaptive._
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.command._
Expand Down Expand Up @@ -534,15 +534,24 @@ trait Spark320PlusShims extends SparkShims with RebaseShims with Logging {
// FileSourceScan is independent from the replacement of the partitionFilters. It is
// possible that the FileSourceScan is on the CPU, while the dynamic partitionFilters
// are on the GPU. And vice versa.
private lazy val partitionFilters = wrapped.partitionFilters.map { filter =>
filter.transformDown {
case dpe@DynamicPruningExpression(inSub: InSubqueryExec)
if inSub.plan.isInstanceOf[SubqueryBroadcastExec] =>

val subBcMeta = GpuOverrides.wrapAndTagPlan(inSub.plan, conf)
subBcMeta.tagForExplain()
val gpuSubBroadcast = subBcMeta.convertIfNeeded().asInstanceOf[BaseSubqueryExec]
dpe.copy(inSub.copy(plan = gpuSubBroadcast))
private lazy val partitionFilters = {
val convertBroadcast = (bc: SubqueryBroadcastExec) => {
val meta = GpuOverrides.wrapAndTagPlan(bc, conf)
meta.tagForExplain()
meta.convertIfNeeded().asInstanceOf[BaseSubqueryExec]
}
wrapped.partitionFilters.map { filter =>
filter.transformDown {
case dpe @ DynamicPruningExpression(inSub: InSubqueryExec) =>
inSub.plan match {
case bc: SubqueryBroadcastExec =>
dpe.copy(inSub.copy(plan = convertBroadcast(bc)))
case reuse @ ReusedSubqueryExec(bc: SubqueryBroadcastExec) =>
dpe.copy(inSub.copy(plan = reuse.copy(convertBroadcast(bc))))
case _ =>
dpe
}
}
}
}

Expand Down

0 comments on commit 30096e7

Please sign in to comment.