Skip to content

Commit

Permalink
Implement ExistenceJoin Iterator using an auxiliary left semijoin (#4796
Browse files Browse the repository at this point in the history
)

This PR implements an iterator for ExistenceJoin 

1. This PR computes ExistenceJoin by executing a left semijoin via cuDF. The lhs GatherMap scatters `true` into a Boolean column with all lhs.numRows  being initially`false` . The rhs data is not gathered. 

1. The PR also fixes regex matching against SparkPlan node strings. The previously used simple String mentions ExistenceJoin only in the CPU plan but does not print ExistenceJoin type as part of the Join exec string in the GPU plan. 

Closes #589
 
Signed-off-by: Gera Shegalov <gera@apache.org>
  • Loading branch information
gerashegalov authored Mar 9, 2022
1 parent 9f727bd commit 98a731f
Show file tree
Hide file tree
Showing 6 changed files with 259 additions and 38 deletions.
1 change: 1 addition & 0 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ Name | Description | Default Value
<a name="sql.incompatibleDateFormats.enabled"></a>spark.rapids.sql.incompatibleDateFormats.enabled|When parsing strings as dates and timestamps in functions like unix_timestamp, some formats are fully supported on the GPU and some are unsupported and will fall back to the CPU. Some formats behave differently on the GPU than the CPU. Spark on the CPU interprets date formats with unsupported trailing characters as nulls, while Spark on the GPU will parse the date with invalid trailing characters. More detail can be found at [parsing strings as dates or timestamps](compatibility.md#parsing-strings-as-dates-or-timestamps).|false
<a name="sql.incompatibleOps.enabled"></a>spark.rapids.sql.incompatibleOps.enabled|For operations that work, but are not 100% compatible with the Spark equivalent set if they should be enabled by default or disabled by default.|false
<a name="sql.join.cross.enabled"></a>spark.rapids.sql.join.cross.enabled|When set to true cross joins are enabled on the GPU|true
<a name="sql.join.existence.enabled"></a>spark.rapids.sql.join.existence.enabled|When set to true existence joins are enabled on the GPU|true
<a name="sql.join.fullOuter.enabled"></a>spark.rapids.sql.join.fullOuter.enabled|When set to true full outer joins are enabled on the GPU|true
<a name="sql.join.inner.enabled"></a>spark.rapids.sql.join.inner.enabled|When set to true inner joins are enabled on the GPU|true
<a name="sql.join.leftAnti.enabled"></a>spark.rapids.sql.join.leftAnti.enabled|When set to true left anti joins are enabled on the GPU|true
Expand Down
69 changes: 53 additions & 16 deletions integration_tests/src/main/python/join_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -779,27 +779,64 @@ def do_join(spark):
# If the condition is something like an AND, it makes the result a subset of a SemiJoin, and
# the optimizer won't use ExistenceJoin.
@ignore_order(local=True)
@pytest.mark.parametrize(
"allowFallback", [
pytest.param('true',
marks=pytest.mark.allow_non_gpu('SortMergeJoinExec')),
pytest.param('false',
marks=pytest.mark.xfail(reason="https://github.com/NVIDIA/spark-rapids/issues/589"))
], ids=idfn
)
def test_existence_join(allowFallback, spark_tmp_table_factory):
@pytest.mark.parametrize('numComplementsToExists', [0, 1, 2], ids=(lambda val: f"complements:{val}"))
@pytest.mark.parametrize('aqeEnabled', [
pytest.param(False, id='aqe:off'),
# workaround: somehow AQE retains RDDScanExec preventing parent ShuffleExchangeExec
# from being executed on GPU
# pytest.param(True, marks=pytest.mark.allow_non_gpu('ShuffleExchangeExec'), id='aqe:on')
])
@pytest.mark.parametrize('conditionalJoin', [False, True], ids=['ast:off', 'ast:on'])
@pytest.mark.parametrize('forceBroadcastHashJoin', [False, True], ids=['broadcastHJ:off', 'broadcastHJ:on'])
def test_existence_join(numComplementsToExists, aqeEnabled, conditionalJoin, forceBroadcastHashJoin, spark_tmp_table_factory):
leftTable = spark_tmp_table_factory.get()
rightTable = spark_tmp_table_factory.get()
def do_join(spark):
# create non-overlapping ranges to have a mix of exists=true and exists=false
spark.createDataFrame([v] for v in range(2, 10)).createOrReplaceTempView(leftTable)
spark.createDataFrame([v] for v in range(0, 8)).createOrReplaceTempView(rightTable)

# left-hand side rows
lhs_upper_bound = 10
lhs_data = list((f"left_{v}", v * 10, v * 100) for v in range(2, lhs_upper_bound))
# duplicate without a match
lhs_data.append(('left_1', 10, 100))
# duplicate with a match
lhs_data.append(('left_2', 20, 200))
lhs_data.append(('left_null', None, None))
df_left = spark.createDataFrame(lhs_data)
df_left.createOrReplaceTempView(leftTable)

rhs_data = list((f"right_{v}", v * 10, v * 100) for v in range(0, 8))
rhs_data.append(('right_null', None, None))
# duplicate every row in the rhs to verify it does not affect
# the number of output rows, which should be equal to the left table row count
rhs_data_with_dupes=[]
for dupe in rhs_data:
rhs_data_with_dupes.extend([dupe, dupe])

df_right = spark.createDataFrame(rhs_data_with_dupes)
df_right.createOrReplaceTempView(rightTable)
cond = "<=" if conditionalJoin else "="
res = spark.sql((
"select * "
"from {} as l "
"where l._1 < 0 "
" OR l._1 in (select * from {} as r)"
).format(leftTable, rightTable))
f"where l._2 >= {10 * (lhs_upper_bound - numComplementsToExists)}"
" or exists (select * from {} as r where r._2 = l._2 and r._3 {} l._3)"
).format(leftTable, rightTable, cond))
return res
assert_cpu_and_gpu_are_equal_collect_with_capture(do_join, r".+Join ExistenceJoin\(exists#[0-9]+\).+")

existenceJoinRegex = r"ExistenceJoin\(exists#[0-9]+\),"
if conditionalJoin:
existenceJoinRegex = existenceJoinRegex + r" \(.+ <= .+\)"

if forceBroadcastHashJoin:
# hints don't work with ExistenceJoin
# forcing by upping the size to the estimated right output
bhjThreshold = "9223372036854775807b"
existenceJoinRegex = r'BroadcastHashJoin .* ' + existenceJoinRegex
else:
bhjThreshold = "-1b"

assert_cpu_and_gpu_are_equal_collect_with_capture(do_join, existenceJoinRegex,
conf={
"spark.sql.adaptive.enabled": aqeEnabled,
"spark.sql.autoBroadcastJoinThreshold": bhjThreshold
})
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,33 @@

package com.nvidia.spark.rapids

import scala.collection.mutable

import ai.rapids.cudf.{GatherMap, NvtxColor, OutOfBoundsPolicy}
import com.nvidia.spark.rapids.RapidsPluginImplicits._

import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.{InnerLike, JoinType, LeftOuter, RightOuter}
import org.apache.spark.sql.vectorized.ColumnarBatch

trait TaskAutoCloseableResource extends AutoCloseable {
protected var closed = false
// iteration-independent resources
private val resources = scala.collection.mutable.ArrayBuffer[AutoCloseable]()
def use[T <: AutoCloseable](ac: T): T = {
resources += ac
ac
}

override def close() = if (!closed) {
closed = true
resources.reverse.safeClose()
resources.clear()
}

TaskContext.get().addTaskCompletionListener[Unit](_ => close())
}

/**
* Base class for iterators producing the results of a join.
* @param gatherNvtxName name to use for the NVTX range when producing the join gather maps
Expand All @@ -37,14 +54,13 @@ abstract class AbstractGpuJoinIterator(
gatherNvtxName: String,
targetSize: Long,
val opTime: GpuMetric,
joinTime: GpuMetric) extends Iterator[ColumnarBatch] with Arm with AutoCloseable {
joinTime: GpuMetric)
extends Iterator[ColumnarBatch]
with Arm
with TaskAutoCloseableResource {
private[this] var nextCb: Option[ColumnarBatch] = None
private[this] var gathererStore: Option[JoinGatherer] = None

protected[this] var closed = false

TaskContext.get().addTaskCompletionListener[Unit](_ => close())

/** Returns whether there are any more batches on the stream side of the join */
protected def hasNextStreamBatch: Boolean

Expand Down Expand Up @@ -158,7 +174,7 @@ abstract class SplittableJoinIterator(
// For some join types even if there is no stream data we might output something
private var isInitialJoin = true
// If the join explodes this holds batches from the stream side split into smaller pieces.
private val pendingSplits = mutable.Queue[SpillableColumnarBatch]()
private val pendingSplits = scala.collection.mutable.Queue[SpillableColumnarBatch]()

protected def computeNumJoinRows(cb: ColumnarBatch): Long

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,8 +418,9 @@ object ExecutionPlanCaptureCallback {
case p if p.expressions.exists(containsExpression(_, className, regexMap)) =>
true
case p: SparkPlan =>
val sparkPlanStringForRegex = p.verboseStringWithSuffix(1000)
regexMap.getOrElseUpdate(className, className.r)
.findFirstIn(p.simpleStringWithNodeId())
.findFirstIn(sparkPlanStringForRegex)
.nonEmpty
}.nonEmpty
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,11 @@ object RapidsConf {
.booleanConf
.createWithDefault(true)

val ENABLE_EXISTENCE_JOIN = conf("spark.rapids.sql.join.existence.enabled")
.doc("When set to true existence joins are enabled on the GPU")
.booleanConf
.createWithDefault(true)

val ENABLE_PROJECT_AST = conf("spark.rapids.sql.projectAstEnabled")
.doc("Enable project operations to use cudf AST expressions when possible.")
.internal()
Expand Down Expand Up @@ -1563,6 +1568,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val areLeftAntiJoinsEnabled: Boolean = get(ENABLE_LEFT_ANTI_JOIN)

lazy val areExistenceJoinsEnabled: Boolean = get(ENABLE_EXISTENCE_JOIN)

lazy val isCastDecimalToFloatEnabled: Boolean = get(ENABLE_CAST_DECIMAL_TO_FLOAT)

lazy val isCastFloatToDecimalEnabled: Boolean = get(ENABLE_CAST_FLOAT_TO_DECIMAL)
Expand Down
Loading

0 comments on commit 98a731f

Please sign in to comment.