diff --git a/integration_tests/src/main/python/join_test.py b/integration_tests/src/main/python/join_test.py index 74d277fb905..e06fd5ceb16 100644 --- a/integration_tests/src/main/python/join_test.py +++ b/integration_tests/src/main/python/join_test.py @@ -1153,11 +1153,13 @@ def do_join(spark): "spark.rapids.sql.input." + scan_name: False}) @ignore_order(local=True) +@pytest.mark.parametrize("join_type", ["Inner", "FullOuter"], ids=idfn) @pytest.mark.parametrize("is_left_host_shuffle", [False, True], ids=idfn) @pytest.mark.parametrize("is_right_host_shuffle", [False, True], ids=idfn) @pytest.mark.parametrize("is_left_smaller", [False, True], ids=idfn) @pytest.mark.parametrize("batch_size", ["1024", "1g"], ids=idfn) -def test_new_inner_join(is_left_host_shuffle, is_right_host_shuffle, is_left_smaller, batch_size): +def test_new_symmetric_join(join_type, is_left_host_shuffle, is_right_host_shuffle, + is_left_smaller, batch_size): join_conf = { "spark.rapids.sql.join.useShuffledSymmetricHashJoin": "true", "spark.sql.autoBroadcastJoinThreshold": "1", @@ -1183,14 +1185,17 @@ def do_join(spark): left_df = left_df.groupBy("key1", "key2").max("ints", "floats") if not is_right_host_shuffle: right_df = right_df.groupBy("key1", "key2").max("doubles", "shorts") - return left_df.join(right_df, ["key1", "key2"], "inner") + return left_df.join(right_df, ["key1", "key2"], join_type) assert_gpu_and_cpu_are_equal_collect(do_join, conf=join_conf) @ignore_order(local=True) +@pytest.mark.parametrize("join_type", ["Inner", "FullOuter"], ids=idfn) @pytest.mark.parametrize("is_left_smaller", [False, True], ids=idfn) @pytest.mark.parametrize("is_ast_supported", [False, True], ids=idfn) @pytest.mark.parametrize("batch_size", ["1024", "1g"], ids=idfn) -def test_new_inner_join_conditional(is_ast_supported, is_left_smaller, batch_size): +def test_new_symmetric_join_conditional(join_type, is_ast_supported, is_left_smaller, batch_size): + if join_type == "FullOuter" and not is_ast_supported: + pytest.skip("Full outer joins do not support a non-AST condition") join_conf = { "spark.rapids.sql.join.useShuffledSymmetricHashJoin": "true", "spark.sql.autoBroadcastJoinThreshold": "1", @@ -1213,5 +1218,5 @@ def do_join(spark): else: # AST does not support logarithm yet cond.append(left_df.ints >= f.log(right_df.ints)) - return left_df.join(right_df, cond, "inner") + return left_df.join(right_df, cond, join_type) assert_gpu_and_cpu_are_equal_collect(do_join, conf=join_conf) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala index d9a5d5b2f3e..685a0a5bc0e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala @@ -28,7 +28,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} -import org.apache.spark.sql.catalyst.plans.{Inner, InnerLike, JoinType, LeftAnti, LeftSemi} +import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, InnerLike, JoinType, LeftAnti, LeftSemi} import org.apache.spark.sql.catalyst.plans.physical.Distribution import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec @@ -70,8 +70,9 @@ class GpuShuffledHashJoinMeta( } val Seq(left, right) = childPlans.map(_.convertIfNeeded()) val joinExec = join.joinType match { - case Inner if conf.useShuffledSymmetricHashJoin => + case Inner | FullOuter if conf.useShuffledSymmetricHashJoin => GpuShuffledSymmetricHashJoinExec( + join.joinType, leftKeys.map(_.convertToGpu()), rightKeys.map(_.convertToGpu()), joinCondition, diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledSymmetricHashJoinExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledSymmetricHashJoinExec.scala index fa4885f848a..d4086dedf54 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledSymmetricHashJoinExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledSymmetricHashJoinExec.scala @@ -31,11 +31,11 @@ import com.nvidia.spark.rapids.shims.{GpuHashPartitioning, ShimBinaryExecNode} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} -import org.apache.spark.sql.catalyst.plans.Inner +import org.apache.spark.sql.catalyst.plans.{FullOuter, InnerLike, JoinType} import org.apache.spark.sql.catalyst.plans.physical.Distribution import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec -import org.apache.spark.sql.rapids.execution.{ConditionalHashJoinIterator, GpuCustomShuffleReaderExec, GpuHashJoin, GpuShuffleExchangeExecBase, HashJoinIterator} +import org.apache.spark.sql.rapids.execution.{ConditionalHashJoinIterator, GpuCustomShuffleReaderExec, GpuHashJoin, GpuShuffleExchangeExecBase, HashFullJoinIterator, HashFullJoinStreamSideIterator, HashJoinIterator} import org.apache.spark.sql.types.DataType import org.apache.spark.sql.vectorized.ColumnarBatch @@ -58,6 +58,7 @@ object GpuShuffledSymmetricHashJoinExec { * be called with the build side that was dynamically determined after probing the join inputs. */ def bind( + joinType: JoinType, leftKeys: Seq[Expression], leftOutput: Seq[Attribute], rightKeys: Seq[Expression], @@ -76,7 +77,12 @@ object GpuShuffledSymmetricHashJoinExec { case GpuBuildRight => (boundRightKeys, rightTypes, boundLeftKeys, leftTypes, leftOutput) case GpuBuildLeft => (boundLeftKeys, leftTypes, boundRightKeys, rightTypes, rightOutput) } - val compareNullsEqual = GpuHashJoin.anyNullableStructChild(boundBuildKeys) + // For join types other than FullOuter, we simply set compareNullsEqual as true to adapt + // struct keys with nullable children. Non-nested keys can also be correctly processed with + // compareNullsEqual = true, because we filter all null records from build table before join. + // For details, see https://github.com/NVIDIA/spark-rapids/issues/2126. + val compareNullsEqual = (joinType != FullOuter) && + GpuHashJoin.anyNullableStructChild(boundBuildKeys) val needNullFilter = compareNullsEqual && boundBuildKeys.exists(_.nullable) BoundJoinExprs(boundBuildKeys, buildTypes, boundStreamKeys, streamTypes, streamOutput, boundCondition, leftOutput.size, compareNullsEqual, needNullFilter) @@ -85,6 +91,7 @@ object GpuShuffledSymmetricHashJoinExec { /** Utility class to track information related to a join. */ class JoinInfo( + val joinType: JoinType, val buildSide: GpuBuildSide, val buildIter: Iterator[ColumnarBatch], val buildSize: Long, @@ -135,6 +142,7 @@ object GpuShuffledSymmetricHashJoinExec { * Probe the left and right join inputs to determine which side should be used as the build * side and which should be used as the stream side. * + * @param joinType type of join to perform * @param leftKeys join keys for the left table * @param leftOutput schema of the left table * @param rawLeftIter iterator of batches for the left table @@ -147,6 +155,7 @@ object GpuShuffledSymmetricHashJoinExec { * @return join information including build side, bound expressions, etc. */ def getJoinInfo( + joinType: JoinType, leftKeys: Seq[Expression], leftOutput: Seq[Attribute], rawLeftIter: Iterator[ColumnarBatch], @@ -190,7 +199,7 @@ object GpuShuffledSymmetricHashJoinExec { } } } - val exprs = BoundJoinExprs.bind(leftKeys, leftOutput, rightKeys, rightOutput, + val exprs = BoundJoinExprs.bind(joinType, leftKeys, leftOutput, rightKeys, rightOutput, condition, buildSide) val (buildQueue, buildSize, streamQueue, rawStreamIter) = buildSide match { case GpuBuildRight => @@ -213,7 +222,7 @@ object GpuShuffledSymmetricHashJoinExec { val streamIter = new CollectTimeIterator("fetch join stream", setupForJoin(streamQueue, rawStreamIter, exprs.streamTypes, gpuBatchSizeBytes, metrics), streamTime) - new JoinInfo(buildSide, buildIter, buildSize, streamIter, exprs) + new JoinInfo(joinType, buildSide, buildIter, buildSize, streamIter, exprs) } } } @@ -301,26 +310,33 @@ object GpuShuffledSymmetricHashJoinExec { gpuBatchSizeBytes: Long, opTime: GpuMetric, joinTime: GpuMetric): Iterator[ColumnarBatch] = { - if (info.exprs.boundCondition.isDefined) { - // ConditionalHashJoinIterator will close the compiled condition - val compiledCondition = info.exprs.boundCondition.get.convertToAst( - info.exprs.numFirstConditionTableColumns).compile() - new ConditionalHashJoinIterator(spillableBuiltBatch, info.exprs.boundBuildKeys, - lazyStream, info.exprs.boundStreamKeys, info.exprs.streamOutput, compiledCondition, - gpuBatchSizeBytes, Inner, info.buildSide, info.exprs.compareNullsEqual, - opTime, joinTime) - } else { - new HashJoinIterator(spillableBuiltBatch, info.exprs.boundBuildKeys, - lazyStream, info.exprs.boundStreamKeys, info.exprs.streamOutput, - gpuBatchSizeBytes, Inner, info.buildSide, info.exprs.compareNullsEqual, - opTime, joinTime) + info.joinType match { + case FullOuter => + new HashFullJoinIterator(spillableBuiltBatch, info.exprs.boundBuildKeys, None, + lazyStream, info.exprs.boundStreamKeys, info.exprs.streamOutput, + info.exprs.boundCondition, info.exprs.numFirstConditionTableColumns, + gpuBatchSizeBytes, info.buildSide, info.exprs.compareNullsEqual, opTime, joinTime) + case _ if info.exprs.boundCondition.isDefined => + // ConditionalHashJoinIterator will close the compiled condition + val compiledCondition = info.exprs.boundCondition.get.convertToAst( + info.exprs.numFirstConditionTableColumns).compile() + new ConditionalHashJoinIterator(spillableBuiltBatch, info.exprs.boundBuildKeys, + lazyStream, info.exprs.boundStreamKeys, info.exprs.streamOutput, compiledCondition, + gpuBatchSizeBytes, info.joinType, info.buildSide, info.exprs.compareNullsEqual, + opTime, joinTime) + case _ => + new HashJoinIterator(spillableBuiltBatch, info.exprs.boundBuildKeys, + lazyStream, info.exprs.boundStreamKeys, info.exprs.streamOutput, + gpuBatchSizeBytes, info.joinType, info.buildSide, info.exprs.compareNullsEqual, + opTime, joinTime) } } } /** - * A GPU shuffled hash join optimized to handle inner joins. Probes the sizes of the input tables - * before performing the join to determine which to use as the build side. + * A GPU shuffled hash join optimized to handle symmetric joins like inner and full outer. + * Probes the sizes of the input tables before performing the join to determine which to use + * as the build side. * * @param leftKeys join keys for the left table * @param rightKeys join keys for the right table @@ -333,6 +349,7 @@ object GpuShuffledSymmetricHashJoinExec { * @param cpuRightKeys original CPU expressions for the right join keys */ case class GpuShuffledSymmetricHashJoinExec( + joinType: JoinType, leftKeys: Seq[Expression], rightKeys: Seq[Expression], condition: Option[Expression], @@ -360,13 +377,21 @@ case class GpuShuffledSymmetricHashJoinExec( Seq(GpuHashPartitioning.getDistribution(cpuLeftKeys), GpuHashPartitioning.getDistribution(cpuRightKeys)) - override def output: Seq[Attribute] = left.output ++ right.output + override def output: Seq[Attribute] = joinType match { + case _: InnerLike => left.output ++ right.output + case FullOuter => + left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true)) + case x => + throw new IllegalArgumentException(s"unsupported join type: $x") + } + override def doExecute(): RDD[InternalRow] = { throw new IllegalStateException(s"${this.getClass} does not support row-based execution") } override def internalDoExecuteColumnar(): RDD[ColumnarBatch] = { + val localJoinType = joinType val localLeftKeys = leftKeys val leftOutput = left.output val isLeftHost = isHostBatchProducer(left) @@ -379,24 +404,24 @@ case class GpuShuffledSymmetricHashJoinExec( left.executeColumnar().zipPartitions(right.executeColumnar()) { case (leftIter, rightIter) => val joinInfo = (isLeftHost, isRightHost) match { case (true, true) => - getHostHostJoinInfo(localLeftKeys, leftOutput, leftIter, + getHostHostJoinInfo(localJoinType, localLeftKeys, leftOutput, leftIter, localRightKeys, rightOutput, rightIter, localCondition, localGpuBatchSizeBytes, localMetrics) case (true, false) => - getHostGpuJoinInfo(localLeftKeys, leftOutput, leftIter, + getHostGpuJoinInfo(localJoinType, localLeftKeys, leftOutput, leftIter, localRightKeys, rightOutput, rightIter, localCondition, localGpuBatchSizeBytes, localMetrics) case (false, true) => - getGpuHostJoinInfo(localLeftKeys, leftOutput, leftIter, + getGpuHostJoinInfo(localJoinType, localLeftKeys, leftOutput, leftIter, localRightKeys, rightOutput, rightIter, localCondition, localGpuBatchSizeBytes, localMetrics) case (false, false) => - getGpuGpuJoinInfo(localLeftKeys, leftOutput, leftIter, + getGpuGpuJoinInfo(localJoinType, localLeftKeys, leftOutput, leftIter, localRightKeys, rightOutput, rightIter, localCondition, localGpuBatchSizeBytes, localMetrics) } val joinIterator = if (joinInfo.buildSize <= localGpuBatchSizeBytes) { - if (joinInfo.buildSize == 0) { + if (localJoinType.isInstanceOf[InnerLike] && joinInfo.buildSize == 0) { Iterator.empty } else { doSmallBuildJoin(joinInfo, localGpuBatchSizeBytes, localMetrics) @@ -448,8 +473,12 @@ case class GpuShuffledSymmetricHashJoinExec( concatTime = metricsMap(CONCAT_TIME), opTime = opTime, opName = "build batch") - assert(buildIter.hasNext, "build side should not be empty") - val spillableBuiltBatch = withResource(buildIter.next()) { batch => + val batch = if (buildIter.hasNext) { + buildIter.next() + } else { + GpuColumnVector.emptyBatchFromTypes(info.exprs.buildTypes) + } + val spillableBuiltBatch = withResource(batch) { batch => assert(!buildIter.hasNext, "build side should have a single batch") LazySpillableColumnarBatch(batch, "built") } @@ -469,7 +498,7 @@ case class GpuShuffledSymmetricHashJoinExec( info: JoinInfo, gpuBatchSizeBytes: Long, metricsMap: Map[String, GpuMetric]): Iterator[ColumnarBatch] = { - new BigInnerJoinIterator(info, gpuBatchSizeBytes, metricsMap) + new BigSymmetricJoinIterator(info, gpuBatchSizeBytes, metricsMap) } /** @@ -477,6 +506,7 @@ case class GpuShuffledSymmetricHashJoinExec( * inputs are coming from a shuffle when not using a GPU-centered shuffle manager). */ private def getHostHostJoinInfo( + joinType: JoinType, leftKeys: Seq[Expression], leftOutput: Seq[Attribute], leftIter: Iterator[ColumnarBatch], @@ -487,7 +517,7 @@ case class GpuShuffledSymmetricHashJoinExec( gpuBatchSizeBytes: Long, metrics: Map[String, GpuMetric]): JoinInfo = { val sizer = new HostHostJoinSizer() - sizer.getJoinInfo(leftKeys, leftOutput, leftIter, rightKeys, rightOutput, rightIter, + sizer.getJoinInfo(joinType, leftKeys, leftOutput, leftIter, rightKeys, rightOutput, rightIter, condition, gpuBatchSizeBytes, metrics) } @@ -496,6 +526,7 @@ case class GpuShuffledSymmetricHashJoinExec( * right table is coming from GPU memory. */ private def getHostGpuJoinInfo( + joinType: JoinType, leftKeys: Seq[Expression], leftOutput: Seq[Attribute], rawLeftIter: Iterator[ColumnarBatch], @@ -511,7 +542,7 @@ case class GpuShuffledSymmetricHashJoinExec( new HostShuffleCoalesceIterator(rawLeftIter, gpuBatchSizeBytes, concatMetrics), leftOutput.map(_.dataType).toArray, concatMetrics) - sizer.getJoinInfo(leftKeys, leftOutput, leftIter, rightKeys, rightOutput, rightIter, + sizer.getJoinInfo(joinType, leftKeys, leftOutput, leftIter, rightKeys, rightOutput, rightIter, condition, gpuBatchSizeBytes, metrics) } @@ -520,6 +551,7 @@ case class GpuShuffledSymmetricHashJoinExec( * left table is coming from host memory. */ private def getGpuHostJoinInfo( + joinType: JoinType, leftKeys: Seq[Expression], leftOutput: Seq[Attribute], leftIter: Iterator[ColumnarBatch], @@ -535,7 +567,7 @@ case class GpuShuffledSymmetricHashJoinExec( new HostShuffleCoalesceIterator(rawRightIter, gpuBatchSizeBytes, concatMetrics), rightOutput.map(_.dataType).toArray, concatMetrics) - sizer.getJoinInfo(leftKeys, leftOutput, leftIter, rightKeys, rightOutput, rightIter, + sizer.getJoinInfo(joinType, leftKeys, leftOutput, leftIter, rightKeys, rightOutput, rightIter, condition, gpuBatchSizeBytes, metrics) } @@ -543,6 +575,7 @@ case class GpuShuffledSymmetricHashJoinExec( * Probe for the join information when both inputs are coming from GPU memory. */ private def getGpuGpuJoinInfo( + joinType: JoinType, leftKeys: Seq[Expression], leftOutput: Seq[Attribute], leftIter: Iterator[ColumnarBatch], @@ -553,7 +586,7 @@ case class GpuShuffledSymmetricHashJoinExec( gpuBatchSizeBytes: Long, metrics: Map[String, GpuMetric]): JoinInfo = { val sizer = new SpillableColumnarBatchJoinSizer(startWithLeftSide = true) - sizer.getJoinInfo(leftKeys, leftOutput, leftIter, rightKeys, rightOutput, rightIter, + sizer.getJoinInfo(joinType, leftKeys, leftOutput, leftIter, rightKeys, rightOutput, rightIter, condition, gpuBatchSizeBytes, metrics) } @@ -826,6 +859,7 @@ object JoinPartitioner { * Join partitioner for the build side of a large join where the build side of the join does not * fit in a single GPU batch. * + * @param joinType type of join being performed * @param numPartitions number of partitions being used in the join * @param buildSideIter iterator of build side batches * @param buildSideTypes schema of the build side batches @@ -834,6 +868,7 @@ object JoinPartitioner { * @param metrics metrics to update */ class BuildSidePartitioner( + val joinType: JoinType, val numPartitions: Int, buildSideIter: Iterator[ColumnarBatch], buildSideTypes: Array[DataType], @@ -879,7 +914,6 @@ class BuildSidePartitioner( closeOnExcept(spillBatchesBuffer) { _ => joinGroups(partitionGroupIndex).foreach { i => val batches = partitions(i).releaseBatches() - assert(batches.nonEmpty) spillBatchesBuffer ++= batches } } @@ -919,15 +953,22 @@ class BuildSidePartitioner( val emptyPartitions = new mutable.BitSet(numPartitions) val joinGroups = new mutable.ArrayBuffer[BitSet]() val sortedIndices = (0 until numPartitions).sortBy(i => partitions(i).getTotalSize) - val (emptyIndices, nonEmptyIndices) = sortedIndices.partition { i => - partitions(i).getTotalSize == 0 + val (emptyIndices, nonEmptyIndices) = joinType match { + case FullOuter => + // empty build partitions still need to produce outputs for corresponding stream partitions + (Seq.empty, sortedIndices) + case _: InnerLike => + sortedIndices.partition { i => + partitions(i).getTotalSize == 0 + } + case x => throw new IllegalStateException(s"unsupported join type: $x") } emptyPartitions ++= emptyIndices var group = new mutable.BitSet(numPartitions) var groupSize = 0L nonEmptyIndices.foreach { i => val newSize = groupSize + partitions(i).getTotalSize - if (newSize > gpuBatchSizeBytes) { + if (groupSize > 0L && newSize > gpuBatchSizeBytes) { if (group.nonEmpty) { joinGroups.append(group) } @@ -991,7 +1032,7 @@ class StreamSidePartitioner( } /** - * Iterator that produces the result of a large inner join where the build side of the join is + * Iterator that produces the result of a large symmetric join where the build side of the join is * too large for a single GPU batch. The prior join input probing phase has sized the build side * of the join, so this partitions both the build side and stream side into N+1 partitions, where * N is the size of the build side divided by the target GPU batch size. @@ -1015,7 +1056,7 @@ class StreamSidePartitioner( * @param gpuBatchSizeBytes target GPU batch size * @param metrics metrics to update */ -class BigInnerJoinIterator( +class BigSymmetricJoinIterator( info: JoinInfo, gpuBatchSizeBytes: Long, metrics: Map[String, GpuMetric]) @@ -1024,21 +1065,38 @@ class BigInnerJoinIterator( private val buildPartitioner = { val numPartitions = (info.buildSize / gpuBatchSizeBytes) + 1 require(numPartitions <= Int.MaxValue, "too many build partitions") - new BuildSidePartitioner(numPartitions.toInt, info.buildIter, info.exprs.buildTypes, - info.exprs.boundBuildKeys, gpuBatchSizeBytes, metrics) + new BuildSidePartitioner(info.joinType, numPartitions.toInt, info.buildIter, + info.exprs.buildTypes, info.exprs.boundBuildKeys, gpuBatchSizeBytes, metrics) } use(buildPartitioner) private val joinGroups = buildPartitioner.getJoinGroups - private var nextJoinGroupIndex = joinGroups.length + private var currentJoinGroupIndex = joinGroups.length - 1 - private val streamPartitioner = new StreamSidePartitioner(buildPartitioner.numPartitions, + private val streamPartitioner = use(new StreamSidePartitioner(buildPartitioner.numPartitions, buildPartitioner.getEmptyPartitions, info.streamIter, info.exprs.streamTypes, - info.exprs.boundStreamKeys, metrics) - use(streamPartitioner) + info.exprs.boundStreamKeys, metrics)) private var subIter: Option[Iterator[ColumnarBatch]] = None + + // Buffer per join group to track build-side rows that have been referenced for full outer joins + private val buildSideRowTrackers: Array[Option[SpillableColumnarBatch]] = { + if (info.joinType == FullOuter) { + val arr = new Array[Option[SpillableColumnarBatch]](joinGroups.length) + arr.indices.foreach { i => arr(i) = None } + arr + } else { + Array.empty + } + } + private var isExhausted = joinGroups.isEmpty + private val opTime = metrics(OP_TIME) + private val joinTime = metrics(JOIN_TIME) + + private lazy val compiledCondition = info.exprs.boundCondition.map { condExpr => + use(opTime.ns(condExpr.convertToAst(info.exprs.numFirstConditionTableColumns).compile())) + } override def hasNext: Boolean = { if (isExhausted) { @@ -1063,14 +1121,51 @@ class BigInnerJoinIterator( subIter.get.next() } + override def close(): Unit = { + super.close() + buildSideRowTrackers.flatten.safeClose() + isExhausted = true + } + private def setupNextJoinIterator(): Unit = { while (!isExhausted && !subIter.exists(_.hasNext)) { - if (nextJoinGroupIndex >= joinGroups.length) { + if (info.joinType == FullOuter) { + // save off the build side tracker buffer for the join group just processed + subIter match { + case Some(streamIter: HashFullJoinStreamSideIterator) => + assert(buildSideRowTrackers(currentJoinGroupIndex).isEmpty, "unexpected row tracker") + buildSideRowTrackers(currentJoinGroupIndex) = streamIter.releaseBuiltSideTracker() + streamIter.close() + case _ => + } + } + if (currentJoinGroupIndex >= joinGroups.length - 1) { // try to pull in the next stream batch if (streamPartitioner.hasInputBatches) { streamPartitioner.partitionNextBatch() - nextJoinGroupIndex = 0 subIter = Some(moveToNextBuildGroup()) + } else if (info.joinType == FullOuter) { + currentJoinGroupIndex = buildSideRowTrackers.indexWhere(_.isDefined) + if (currentJoinGroupIndex == -1) { + isExhausted = true + subIter = None + } else { + // TODO: Can free the build-side batch in the build partitioner early here since + // this will be the last iterator to use it. + // https://github.com/NVIDIA/spark-rapids/issues/10282 + val tracker = buildSideRowTrackers(currentJoinGroupIndex) + buildSideRowTrackers(currentJoinGroupIndex) = None + // Setup an iterator to produce the final full outer join batches for the join group. + // All stream batches have been consumed, so an empty iterator is used for the stream + // side. The condition also doesn't need to be passed since there are no join row pairs + // left to evaluate conditionally. The only rows that will be emitted by this are the + // build-side rows that never matched rows on the stream side. + subIter = Some(new HashFullJoinIterator( + buildPartitioner.getBuildBatch(currentJoinGroupIndex), info.exprs.boundBuildKeys, + tracker, Iterator.empty, info.exprs.boundStreamKeys, info.exprs.streamOutput, + None, 0, gpuBatchSizeBytes, info.buildSide, info.exprs.compareNullsEqual, + opTime, joinTime)) + } } else { isExhausted = true subIter = None @@ -1082,9 +1177,11 @@ class BigInnerJoinIterator( } private def moveToNextBuildGroup(): Iterator[ColumnarBatch] = { - val builtBatch = buildPartitioner.getBuildBatch(nextJoinGroupIndex) - val group = joinGroups(nextJoinGroupIndex) - nextJoinGroupIndex += 1 + // If we were at the last build group, loop back to the first since we're processing the next + // stream batch. + currentJoinGroupIndex = (currentJoinGroupIndex + 1) % joinGroups.length + val builtBatch = buildPartitioner.getBuildBatch(currentJoinGroupIndex) + val group = joinGroups(currentJoinGroupIndex) val streamBatches = streamPartitioner.releasePartitions(group) val lazyStream = new Iterator[LazySpillableColumnarBatch] { onTaskCompletion(streamBatches.safeClose()) @@ -1103,7 +1200,18 @@ class BigInnerJoinIterator( } } } - GpuShuffledSymmetricHashJoinExec.createJoinIterator(info, builtBatch, lazyStream, - gpuBatchSizeBytes, metrics(OP_TIME), metrics(JOIN_TIME)) + if (info.joinType == FullOuter) { + // Build an iterator to perform the stream-side of the full outer join for the join group, + // tracking which rows are referenced so far. The iterator will own the tracker of build side + // rows referenced until we release it after the iterator has produced all of the batches. + val buildRowTracker = buildSideRowTrackers(currentJoinGroupIndex) + buildSideRowTrackers(currentJoinGroupIndex) = None + new HashFullJoinStreamSideIterator(builtBatch, info.exprs.boundBuildKeys, buildRowTracker, + lazyStream, info.exprs.boundStreamKeys, info.exprs.streamOutput, compiledCondition, + gpuBatchSizeBytes, info.buildSide, info.exprs.compareNullsEqual, opTime, joinTime) + } else { + GpuShuffledSymmetricHashJoinExec.createJoinIterator(info, builtBatch, lazyStream, + gpuBatchSizeBytes, opTime, joinTime) + } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortMergeJoinMeta.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortMergeJoinMeta.scala index 15cabdcb7a9..acc4b2a80e1 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortMergeJoinMeta.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortMergeJoinMeta.scala @@ -16,7 +16,7 @@ package com.nvidia.spark.rapids -import org.apache.spark.sql.catalyst.plans.Inner +import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner} import org.apache.spark.sql.execution.SortExec import org.apache.spark.sql.execution.joins.SortMergeJoinExec import org.apache.spark.sql.rapids.execution.{GpuHashJoin, JoinTypeChecks} @@ -83,8 +83,9 @@ class GpuSortMergeJoinMeta( } val Seq(left, right) = childPlans.map(_.convertIfNeeded()) val joinExec = join.joinType match { - case Inner if conf.useShuffledSymmetricHashJoin => + case Inner | FullOuter if conf.useShuffledSymmetricHashJoin => GpuShuffledSymmetricHashJoinExec( + join.joinType, leftKeys.map(_.convertToGpu()), rightKeys.map(_.convertToGpu()), joinCondition,