Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add preliminary test and test framework changes for ExistanceJoin #4697

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 38 additions & 4 deletions integration_tests/src/main/python/join_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import pytest
from pyspark.sql.functions import broadcast
from pyspark.sql.types import *
from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_fallback_collect
from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_fallback_collect, assert_cpu_and_gpu_are_equal_collect_with_capture
from conftest import is_databricks_runtime, is_emr_runtime
from data_gen import *
from marks import ignore_order, allow_non_gpu, incompat, validate_execs_in_gpu_plan
Expand Down Expand Up @@ -361,7 +361,7 @@ def do_join(spark):
def test_right_broadcast_nested_loop_join_with_ast_condition(data_gen, join_type, batch_size):
def do_join(spark):
left, right = create_df(spark, data_gen, 50, 25)
# This test is impacted by https://github.com/NVIDIA/spark-rapids/issues/294
# This test is impacted by https://github.com/NVIDIA/spark-rapids/issues/294
# if the sizes are large enough to have both 0.0 and -0.0 show up 500 and 250
# but these take a long time to verify so we run with smaller numbers by default
# that do not expose the error
Expand Down Expand Up @@ -651,15 +651,15 @@ def do_join(spark):

if (cache_side == 'cache_left'):
# Try to force the shuffle to be split between CPU and GPU for the join
# by default if the operation after the shuffle is not on the GPU then
# by default if the operation after the shuffle is not on the GPU then
# don't do a GPU shuffle, so do something simple after the repartition
# to make sure that the GPU shuffle is used.
left = left.repartition('a').selectExpr('b + 1 as b', 'a').cache()
left.count() # populate the cache
else:
#cache_right
# Try to force the shuffle to be split between CPU and GPU for the join
# by default if the operation after the shuffle is not on the GPU then
# by default if the operation after the shuffle is not on the GPU then
# don't do a GPU shuffle, so do something simple after the repartition
# to make sure that the GPU shuffle is used.
right = right.repartition('r_a').selectExpr('c + 1 as c', 'r_a').cache()
Expand Down Expand Up @@ -785,3 +785,37 @@ def do_join(spark):
return spark.sql("select a.* from {} a, {} b where a.name=b.name".format(
resultdf_name, resultdf_name))
assert_gpu_and_cpu_are_equal_collect(do_join)

# ExistenceJoin occurs in the context of existential subqueries (which is rewritten to SemiJoin) if
# there is an additional condition that may qualify left records even though they don't have
# join partner records from the right.
#
# Thus a query is rewritten roughly as a LeftOuter with an additional Boolean column "exists" added.
# which feeds into a filter "exists OR someOtherPredicate"
# If the condition is something like an AND, it makes the result a subset of a SemiJoin, and
# the optimizer won't use ExistenceJoin.
@ignore_order(local=True)
@pytest.mark.parametrize(
"allowFallback", [
pytest.param('true',
marks=pytest.mark.allow_non_gpu('SortMergeJoinExec')),
pytest.param('false',
marks=pytest.mark.xfail(reason="https://github.com/NVIDIA/spark-rapids/issues/589"))
jlowe marked this conversation as resolved.
Show resolved Hide resolved
], ids=idfn
)
def test_existence_join(allowFallback, spark_tmp_table_factory):
leftTable = spark_tmp_table_factory.get()
rightTable = spark_tmp_table_factory.get()
def do_join(spark):
# create non-overlapping ranges to have a mix of exists=true and exists=false
spark.createDataFrame([v] for v in range(2, 10)).createOrReplaceTempView(leftTable)
spark.createDataFrame([v] for v in range(0, 8)).createOrReplaceTempView(rightTable)
res = spark.sql((
"select * "
"from {} as l "
"where l._1 < 0 "
" OR l._1 in (select * from {} as r)"
).format(leftTable, rightTable))
return res
assert_cpu_and_gpu_are_equal_collect_with_capture(do_join, r".+Join ExistenceJoin\(exists#[0-9]+\).+")

25 changes: 20 additions & 5 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}

import scala.collection.JavaConverters._
import scala.util.Try
import scala.util.matching.Regex

import com.nvidia.spark.rapids.python.PythonWorkerSemaphore

Expand Down Expand Up @@ -298,7 +299,7 @@ object RapidsExecutorPlugin {
}
}

object ExecutionPlanCaptureCallback {
object ExecutionPlanCaptureCallback extends Logging {
private[this] val shouldCapture: AtomicBoolean = new AtomicBoolean(false)
private[this] val execPlan: AtomicReference[SparkPlan] = new AtomicReference[SparkPlan]()

Expand Down Expand Up @@ -358,6 +359,7 @@ object ExecutionPlanCaptureCallback {
}

def assertContains(gpuPlan: SparkPlan, className: String): Unit = {
implicit val regexMap = scala.collection.mutable.Map.empty[String, Try[Regex]]
jlowe marked this conversation as resolved.
Show resolved Hide resolved
assert(containsPlan(gpuPlan, className),
s"Could not find $className in the Spark plan\n$gpuPlan")
}
Expand All @@ -368,6 +370,7 @@ object ExecutionPlanCaptureCallback {
}

def assertNotContain(gpuPlan: SparkPlan, className: String): Unit = {
implicit val regexMap = scala.collection.mutable.Map.empty[String, Try[Regex]]
assert(!containsPlan(gpuPlan, className),
s"We found $className in the Spark plan\n$gpuPlan")
}
Expand All @@ -391,13 +394,17 @@ object ExecutionPlanCaptureCallback {
executedPlan.expressions.exists(didFallBack(_, fallbackCpuClass))
}

private def containsExpression(exp: Expression, className: String): Boolean = exp.find {
private def containsExpression(exp: Expression, className: String)(
implicit regexMap: scala.collection.mutable.Map[String, Try[Regex]]
): Boolean = exp.find {
case e if PlanUtils.getBaseNameFromClass(e.getClass.getName) == className => true
case e: ExecSubqueryExpression => containsPlan(e.plan, className)
case _ => false
}.nonEmpty

private def containsPlan(plan: SparkPlan, className: String): Boolean = plan.find {
private def containsPlan(plan: SparkPlan, className: String)(
implicit regexMap: scala.collection.mutable.Map[String, Try[Regex]]
): Boolean = plan.find {
case p if PlanUtils.sameClass(p, className) =>
true
case p: AdaptiveSparkPlanExec =>
Expand All @@ -408,8 +415,16 @@ object ExecutionPlanCaptureCallback {
containsPlan(p.child, className)
case p: ReusedExchangeExec =>
containsPlan(p.child, className)
case p =>
p.expressions.exists(containsExpression(_, className))
case p if p.expressions.exists(containsExpression(_, className)) =>
true
case p: SparkPlan =>
regexMap.getOrElseUpdate(className,
Try(className.r).recoverWith { case t: Throwable =>
logWarning(s"Regex broken in a simpleStringWithNodeId match, $t")
jlowe marked this conversation as resolved.
Show resolved Hide resolved
scala.util.Failure(t)
})
.toOption
.flatMap(_.findFirstIn(p.simpleStringWithNodeId())).nonEmpty
}.nonEmpty
}

Expand Down