From d0c120de74cac40c419be6e7d08f732fba8d15bb Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 9 Dec 2020 12:34:13 -0700 Subject: [PATCH] Refactor to stop inheriting from HashJoin (#1321) * Move GpuHashJoin out of shims and stop extending HashJoin Signed-off-by: Andy Grove * clean up imports Signed-off-by: Andy Grove --- .../spark300/GpuBroadcastHashJoinExec.scala | 10 +- .../spark300/GpuShuffledHashJoinExec.scala | 10 +- .../shims/spark300/GpuSortMergeJoinExec.scala | 3 +- .../rapids/shims/spark300/Spark300Shims.scala | 7 - .../spark301/GpuBroadcastHashJoinExec.scala | 17 +- .../spark301db/GpuBroadcastHashJoinExec.scala | 13 +- .../rapids/shims/spark301db/GpuHashJoin.scala | 328 ------------------ .../spark301db/GpuShuffledHashJoinExec.scala | 9 +- .../spark301db/GpuSortMergeJoinExec.scala | 3 +- .../shims/spark301db/Spark301dbShims.scala | 7 - .../spark310/GpuBroadcastHashJoinExec.scala | 30 +- .../rapids/shims/spark310/GpuHashJoin.scala | 328 ------------------ .../spark310/GpuShuffledHashJoinExec.scala | 14 +- .../shims/spark310/GpuSortMergeJoinExec.scala | 3 +- .../rapids/shims/spark310/Spark310Shims.scala | 7 - .../joins/HashJoinWithoutCodegen.scala | 39 --- .../spark/rapids/GpuTransitionOverrides.scala | 4 +- .../com/nvidia/spark/rapids/SparkShims.scala | 1 - .../sql/rapids/execution}/GpuHashJoin.scala | 55 ++- .../spark/rapids/BroadcastHashJoinSuite.scala | 8 +- 20 files changed, 103 insertions(+), 793 deletions(-) delete mode 100644 shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuHashJoin.scala delete mode 100644 shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuHashJoin.scala delete mode 100644 shims/spark310/src/main/scala/org/apache/spark/sql/execution/joins/HashJoinWithoutCodegen.scala rename {shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300 => sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution}/GpuHashJoin.scala (85%) diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuBroadcastHashJoinExec.scala b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuBroadcastHashJoinExec.scala index ee10a5cd27e..970dc50b842 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuBroadcastHashJoinExec.scala +++ b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuBroadcastHashJoinExec.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} -import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExecBase, SerializeConcatHostBuffersDeserializeBatch} +import org.apache.spark.sql.rapids.execution.{GpuHashJoin, SerializeConcatHostBuffersDeserializeBatch} import org.apache.spark.sql.types.DataType import org.apache.spark.sql.vectorized.ColumnarBatch @@ -85,7 +85,7 @@ class GpuBroadcastHashJoinMeta( GpuBroadcastHashJoinExec( leftKeys.map(_.convertToGpu()), rightKeys.map(_.convertToGpu()), - join.joinType, join.buildSide, + join.joinType, GpuJoinUtils.getGpuBuildSide(join.buildSide), condition.map(_.convertToGpu()), left, right) } @@ -95,7 +95,7 @@ case class GpuBroadcastHashJoinExec( leftKeys: Seq[Expression], rightKeys: Seq[Expression], joinType: JoinType, - buildSide: BuildSide, + buildSide: GpuBuildSide, condition: Option[Expression], left: SparkPlan, right: SparkPlan) extends BinaryExecNode with GpuHashJoin { @@ -109,9 +109,9 @@ case class GpuBroadcastHashJoinExec( override def requiredChildDistribution: Seq[Distribution] = { val mode = HashedRelationBroadcastMode(buildKeys) buildSide match { - case BuildLeft => + case GpuBuildLeft => BroadcastDistribution(mode) :: UnspecifiedDistribution :: Nil - case BuildRight => + case GpuBuildRight => UnspecifiedDistribution :: BroadcastDistribution(mode) :: Nil } } diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuShuffledHashJoinExec.scala b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuShuffledHashJoinExec.scala index c4948ec6935..b88c3a16ee3 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuShuffledHashJoinExec.scala +++ b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuShuffledHashJoinExec.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{Distribution, HashClustered import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide, ShuffledHashJoinExec} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} -import org.apache.spark.sql.rapids.execution.GpuShuffledHashJoinBase +import org.apache.spark.sql.rapids.execution.{GpuHashJoin, GpuShuffledHashJoinBase} import org.apache.spark.sql.types.DataType import org.apache.spark.sql.vectorized.ColumnarBatch @@ -73,7 +73,7 @@ class GpuShuffledHashJoinMeta( leftKeys.map(_.convertToGpu()), rightKeys.map(_.convertToGpu()), join.joinType, - join.buildSide, + GpuJoinUtils.getGpuBuildSide(join.buildSide), condition.map(_.convertToGpu()), childPlans(0).convertIfNeeded(), childPlans(1).convertIfNeeded(), @@ -84,7 +84,7 @@ case class GpuShuffledHashJoinExec( leftKeys: Seq[Expression], rightKeys: Seq[Expression], joinType: JoinType, - buildSide: BuildSide, + buildSide: GpuBuildSide, condition: Option[Expression], left: SparkPlan, right: SparkPlan, @@ -116,8 +116,8 @@ case class GpuShuffledHashJoinExec( } override def childrenCoalesceGoal: Seq[CoalesceGoal] = buildSide match { - case BuildLeft => Seq(RequireSingleBatch, null) - case BuildRight => Seq(null, RequireSingleBatch) + case GpuBuildLeft => Seq(RequireSingleBatch, null) + case GpuBuildRight => Seq(null, RequireSingleBatch) } override def doExecuteColumnar() : RDD[ColumnarBatch] = { diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuSortMergeJoinExec.scala b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuSortMergeJoinExec.scala index 43bc1aef4d3..4c5594c468c 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuSortMergeJoinExec.scala +++ b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuSortMergeJoinExec.scala @@ -21,6 +21,7 @@ import com.nvidia.spark.rapids._ import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, InnerLike, JoinType, LeftAnti, LeftOuter, LeftSemi, RightOuter} import org.apache.spark.sql.execution.SortExec import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, SortMergeJoinExec} +import org.apache.spark.sql.rapids.execution.GpuHashJoin import org.apache.spark.sql.types.DataType /** @@ -85,7 +86,7 @@ class GpuSortMergeJoinMeta( leftKeys.map(_.convertToGpu()), rightKeys.map(_.convertToGpu()), join.joinType, - buildSide, + GpuJoinUtils.getGpuBuildSide(buildSide), condition.map(_.convertToGpu()), childPlans(0).convertIfNeeded(), childPlans(1).convertIfNeeded(), diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala index a454978e0b3..084225f3f96 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala +++ b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala @@ -105,13 +105,6 @@ class Spark300Shims extends SparkShims { queryStage.shuffle.asInstanceOf[GpuShuffleExchangeExecBase] } - override def isGpuHashJoin(plan: SparkPlan): Boolean = { - plan match { - case _: GpuHashJoin => true - case _ => false - } - } - override def isGpuBroadcastHashJoin(plan: SparkPlan): Boolean = { plan match { case _: GpuBroadcastHashJoinExec => true diff --git a/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/GpuBroadcastHashJoinExec.scala b/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/GpuBroadcastHashJoinExec.scala index 363876848b7..79ddddfd55d 100644 --- a/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/GpuBroadcastHashJoinExec.scala +++ b/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/GpuBroadcastHashJoinExec.scala @@ -15,9 +15,9 @@ */ package com.nvidia.spark.rapids.shims.spark301 -import com.nvidia.spark.rapids.{BaseExprMeta, ConfKeysAndIncompat, GpuBindReferences, GpuBroadcastJoinMeta, GpuColumnVector, GpuExec, GpuOverrides, GpuProjectExec, RapidsConf, RapidsMeta, SparkPlanMeta} +import com.nvidia.spark.rapids.{BaseExprMeta, ConfKeysAndIncompat, GpuBindReferences, GpuBroadcastJoinMeta, GpuBuildLeft, GpuBuildRight, GpuBuildSide, GpuColumnVector, GpuExec, GpuOverrides, GpuProjectExec, RapidsConf, RapidsMeta, SparkPlanMeta} import com.nvidia.spark.rapids.GpuMetricNames.{NUM_OUTPUT_BATCHES, NUM_OUTPUT_ROWS, TOTAL_TIME} -import com.nvidia.spark.rapids.shims.spark300.GpuHashJoin +import com.nvidia.spark.rapids.shims.spark300.GpuJoinUtils import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -27,9 +27,9 @@ import org.apache.spark.sql.catalyst.plans.physical.{BroadcastDistribution, Dist import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec import org.apache.spark.sql.execution.exchange.ReusedExchangeExec -import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, BuildRight, BuildSide, HashedRelationBroadcastMode} +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, BuildRight, HashedRelationBroadcastMode} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} -import org.apache.spark.sql.rapids.execution.SerializeConcatHostBuffersDeserializeBatch +import org.apache.spark.sql.rapids.execution.{GpuHashJoin, SerializeConcatHostBuffersDeserializeBatch} import org.apache.spark.sql.types.DataType import org.apache.spark.sql.vectorized.ColumnarBatch @@ -82,7 +82,8 @@ class GpuBroadcastHashJoinMeta( GpuBroadcastHashJoinExec( leftKeys.map(_.convertToGpu()), rightKeys.map(_.convertToGpu()), - join.joinType, join.buildSide, + join.joinType, + GpuJoinUtils.getGpuBuildSide(join.buildSide), condition.map(_.convertToGpu()), left, right) } @@ -92,7 +93,7 @@ case class GpuBroadcastHashJoinExec( leftKeys: Seq[Expression], rightKeys: Seq[Expression], joinType: JoinType, - buildSide: BuildSide, + buildSide: GpuBuildSide, condition: Option[Expression], left: SparkPlan, right: SparkPlan) extends BinaryExecNode with GpuHashJoin { @@ -106,9 +107,9 @@ case class GpuBroadcastHashJoinExec( override def requiredChildDistribution: Seq[Distribution] = { val mode = HashedRelationBroadcastMode(buildKeys) buildSide match { - case BuildLeft => + case GpuBuildLeft => BroadcastDistribution(mode) :: UnspecifiedDistribution :: Nil - case BuildRight => + case GpuBuildRight => UnspecifiedDistribution :: BroadcastDistribution(mode) :: Nil } } diff --git a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuBroadcastHashJoinExec.scala b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuBroadcastHashJoinExec.scala index 02d78fb4e15..b67ea375ecb 100644 --- a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuBroadcastHashJoinExec.scala +++ b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuBroadcastHashJoinExec.scala @@ -15,7 +15,7 @@ */ package com.nvidia.spark.rapids.shims.spark301db -import com.nvidia.spark.rapids.{BaseExprMeta, ConfKeysAndIncompat, GpuBindReferences, GpuBroadcastJoinMeta, GpuColumnVector, GpuExec, GpuOverrides, GpuProjectExec, RapidsConf, RapidsMeta, SparkPlanMeta} +import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.GpuMetricNames.{NUM_OUTPUT_BATCHES, NUM_OUTPUT_ROWS, TOTAL_TIME} import org.apache.spark.rdd.RDD @@ -29,7 +29,7 @@ import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashedRelationBroadcastMode} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} -import org.apache.spark.sql.rapids.execution.SerializeConcatHostBuffersDeserializeBatch +import org.apache.spark.sql.rapids.execution.{GpuHashJoin, SerializeConcatHostBuffersDeserializeBatch} import org.apache.spark.sql.types.DataType import org.apache.spark.sql.vectorized.ColumnarBatch @@ -82,7 +82,8 @@ class GpuBroadcastHashJoinMeta( GpuBroadcastHashJoinExec( leftKeys.map(_.convertToGpu()), rightKeys.map(_.convertToGpu()), - join.joinType, join.buildSide, + join.joinType, + GpuJoinUtils.getGpuBuildSide(join.buildSide), condition.map(_.convertToGpu()), left, right) } @@ -92,7 +93,7 @@ case class GpuBroadcastHashJoinExec( leftKeys: Seq[Expression], rightKeys: Seq[Expression], joinType: JoinType, - buildSide: BuildSide, + buildSide: GpuBuildSide, condition: Option[Expression], left: SparkPlan, right: SparkPlan) extends BinaryExecNode with GpuHashJoin { @@ -106,9 +107,9 @@ case class GpuBroadcastHashJoinExec( override def requiredChildDistribution: Seq[Distribution] = { val mode = HashedRelationBroadcastMode(buildKeys) buildSide match { - case BuildLeft => + case GpuBuildLeft => BroadcastDistribution(mode) :: UnspecifiedDistribution :: Nil - case BuildRight => + case GpuBuildRight => UnspecifiedDistribution :: BroadcastDistribution(mode) :: Nil } } diff --git a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuHashJoin.scala b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuHashJoin.scala deleted file mode 100644 index 74f6626f8c5..00000000000 --- a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuHashJoin.scala +++ /dev/null @@ -1,328 +0,0 @@ -/* - * Copyright (c) 2020, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.nvidia.spark.rapids.shims.spark301db - -import ai.rapids.cudf.{NvtxColor, Table} -import com.nvidia.spark.rapids._ - -import org.apache.spark.TaskContext -import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} -import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} -import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, InnerLike, JoinType, LeftAnti, LeftExistence, LeftOuter, LeftSemi, RightOuter} -import org.apache.spark.sql.execution.joins.HashJoin -import org.apache.spark.sql.execution.metric.SQLMetric -import org.apache.spark.sql.rapids.GpuAnd -import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} - -object GpuHashJoin { - def tagJoin( - meta: RapidsMeta[_, _, _], - joinType: JoinType, - leftKeys: Seq[Expression], - rightKeys: Seq[Expression], - condition: Option[Expression]): Unit = joinType match { - case _: InnerLike => - case FullOuter | RightOuter | LeftOuter | LeftSemi | LeftAnti => - if (condition.isDefined) { - meta.willNotWorkOnGpu(s"$joinType joins currently do not support conditions") - } - case _ => meta.willNotWorkOnGpu(s"$joinType currently is not supported") - } - - def incRefCount(cb: ColumnarBatch): ColumnarBatch = { - GpuColumnVector.extractBases(cb).foreach(_.incRefCount()) - cb - } -} - -trait GpuHashJoin extends GpuExec with HashJoin { - - override def output: Seq[Attribute] = { - joinType match { - case _: InnerLike => - left.output ++ right.output - case LeftOuter => - left.output ++ right.output.map(_.withNullability(true)) - case RightOuter => - left.output.map(_.withNullability(true)) ++ right.output - case j: ExistenceJoin => - left.output :+ j.exists - case LeftExistence(_) => - left.output - case FullOuter => - left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true)) - case x => - throw new IllegalArgumentException(s"GpuHashJoin should not take $x as the JoinType") - } - } - - // If we have a single batch streamed in then we will produce a single batch of output - // otherwise it can get smaller or bigger, we just don't know. When we support out of - // core joins this will change - override def outputBatching: CoalesceGoal = { - val batching = buildSide match { - case BuildLeft => GpuExec.outputBatching(right) - case BuildRight => GpuExec.outputBatching(left) - } - if (batching == RequireSingleBatch) { - RequireSingleBatch - } else { - null - } - } - - protected lazy val (gpuBuildKeys, gpuStreamedKeys) = { - require(leftKeys.map(_.dataType) == rightKeys.map(_.dataType), - "Join keys from two sides should have same types") - val lkeys = GpuBindReferences.bindGpuReferences(leftKeys, left.output) - val rkeys = GpuBindReferences.bindGpuReferences(rightKeys, right.output) - buildSide match { - case BuildLeft => (lkeys, rkeys) - case BuildRight => (rkeys, lkeys) - } - } - - /** - * Place the columns in left and the columns in right into a single ColumnarBatch - */ - def combine(left: ColumnarBatch, right: ColumnarBatch): ColumnarBatch = { - val l = GpuColumnVector.extractColumns(left) - val r = GpuColumnVector.extractColumns(right) - val c = l ++ r - new ColumnarBatch(c.asInstanceOf[Array[ColumnVector]], left.numRows()) - } - - // TODO eventually dedupe the keys - lazy val joinKeyIndices: Range = gpuBuildKeys.indices - - val localBuildOutput: Seq[Attribute] = buildPlan.output - // The first columns are the ones we joined on and need to remove - lazy val joinIndices: Seq[Int] = joinType match { - case RightOuter => - // The left table and right table are switched in the output - // because we don't support a right join, only left - val numRight = right.output.length - val numLeft = left.output.length - val joinLength = joinKeyIndices.length - def remap(index: Int): Int = { - if (index < numLeft) { - // part of the left table, but is on the right side of the tmp output - index + joinLength + numRight - } else { - // part of the right table, but is on the left side of the tmp output - index + joinLength - numLeft - } - } - output.indices.map (remap) - case _ => - val joinLength = joinKeyIndices.length - output.indices.map (v => v + joinLength) - } - - // Spark adds in rules to filter out nulls for some types of joins, but it does not - // guarantee 100% that all nulls will be filtered out by the time they get to - // this point, but because of https://github.com/rapidsai/cudf/issues/6052 - // we need to filter out the nulls ourselves until it is fixed. - // InnerLike | LeftSemi => - // filter left and right keys - // RightOuter => - // filter left keys - // LeftOuter | LeftAnti => - // filter right keys - - private[this] lazy val shouldFilterBuiltTableForNulls: Boolean = { - val builtAnyNullable = gpuBuildKeys.exists(_.nullable) - (joinType, buildSide) match { - case (_: InnerLike | LeftSemi, _) => builtAnyNullable - case (RightOuter, BuildLeft) => builtAnyNullable - case (LeftOuter | LeftAnti, BuildRight) => builtAnyNullable - case _ => false - } - } - - private[this] lazy val shouldFilterStreamTableForNulls: Boolean = { - val streamedAnyNullable = gpuStreamedKeys.exists(_.nullable) - (joinType, buildSide) match { - case (_: InnerLike | LeftSemi, _) => streamedAnyNullable - case (RightOuter, BuildRight) => streamedAnyNullable - case (LeftOuter | LeftAnti, BuildLeft) => streamedAnyNullable - case _ => false - } - } - - private[this] def mkNullFilterExpr(exprs: Seq[GpuExpression]): GpuExpression = - exprs.zipWithIndex.map { kv => - GpuIsNotNull(GpuBoundReference(kv._2, kv._1.dataType, kv._1.nullable)) - }.reduce(GpuAnd) - - private[this] lazy val builtTableNullFilterExpression: GpuExpression = - mkNullFilterExpr(gpuBuildKeys) - - private[this] lazy val streamedTableNullFilterExpression: GpuExpression = - mkNullFilterExpr(gpuStreamedKeys) - - /** - * Filter the builtBatch if needed. builtBatch will be closed. - */ - def filterBuiltTableIfNeeded(builtBatch: ColumnarBatch): ColumnarBatch = - if (shouldFilterBuiltTableForNulls) { - GpuFilter(builtBatch, builtTableNullFilterExpression) - } else { - builtBatch - } - - private[this] def filterStreamedTableIfNeeded(streamedBatch:ColumnarBatch): ColumnarBatch = - if (shouldFilterStreamTableForNulls) { - GpuFilter(streamedBatch, streamedTableNullFilterExpression) - } else { - streamedBatch - } - - def doJoin(builtTable: Table, - stream: Iterator[ColumnarBatch], - boundCondition: Option[Expression], - numOutputRows: SQLMetric, - joinOutputRows: SQLMetric, - numOutputBatches: SQLMetric, - streamTime: SQLMetric, - joinTime: SQLMetric, - filterTime: SQLMetric, - totalTime: SQLMetric): Iterator[ColumnarBatch] = { - new Iterator[ColumnarBatch] { - import scala.collection.JavaConverters._ - var nextCb: Option[ColumnarBatch] = None - var first: Boolean = true - - TaskContext.get().addTaskCompletionListener[Unit](_ => closeCb()) - - def closeCb(): Unit = { - nextCb.foreach(_.close()) - nextCb = None - } - - override def hasNext: Boolean = { - var mayContinue = true - while (nextCb.isEmpty && mayContinue) { - val startTime = System.nanoTime() - if (stream.hasNext) { - val cb = stream.next() - streamTime += (System.nanoTime() - startTime) - nextCb = doJoin(builtTable, cb, boundCondition, joinOutputRows, numOutputRows, - numOutputBatches, joinTime, filterTime) - totalTime += (System.nanoTime() - startTime) - } else if (first) { - // We have to at least try one in some cases - val cb = GpuColumnVector.emptyBatch(streamedPlan.output.asJava) - streamTime += (System.nanoTime() - startTime) - nextCb = doJoin(builtTable, cb, boundCondition, joinOutputRows, numOutputRows, - numOutputBatches, joinTime, filterTime) - totalTime += (System.nanoTime() - startTime) - } else { - mayContinue = false - } - first = false - } - nextCb.isDefined - } - - override def next(): ColumnarBatch = { - if (!hasNext) { - throw new NoSuchElementException() - } - val ret = nextCb.get - nextCb = None - ret - } - } - } - - private[this] def doJoin(builtTable: Table, - streamedBatch: ColumnarBatch, - boundCondition: Option[Expression], - numOutputRows: SQLMetric, - numJoinOutputRows: SQLMetric, - numOutputBatches: SQLMetric, - joinTime: SQLMetric, - filterTime: SQLMetric): Option[ColumnarBatch] = { - - val combined = withResource(streamedBatch) { streamedBatch => - withResource(GpuProjectExec.project(streamedBatch, gpuStreamedKeys)) { - streamedKeysBatch => - GpuHashJoin.incRefCount(combine(streamedKeysBatch, streamedBatch)) - } - } - val streamedTable = withResource(filterStreamedTableIfNeeded(combined)) { cb => - GpuColumnVector.from(cb) - } - - val nvtxRange = new NvtxWithMetrics("hash join", NvtxColor.ORANGE, joinTime) - val joined = try { - buildSide match { - case BuildLeft => doJoinLeftRight(builtTable, streamedTable) - case BuildRight => doJoinLeftRight(streamedTable, builtTable) - } - } finally { - streamedTable.close() - nvtxRange.close() - } - - numJoinOutputRows += joined.numRows() - - val tmp = if (boundCondition.isDefined) { - GpuFilter(joined, boundCondition.get, numOutputRows, numOutputBatches, filterTime) - } else { - numOutputRows += joined.numRows() - numOutputBatches += 1 - joined - } - if (tmp.numRows() == 0) { - // Not sure if there is a better way to work around this - numOutputBatches.set(numOutputBatches.value - 1) - tmp.close() - None - } else { - Some(tmp) - } - } - - private[this] def doJoinLeftRight(leftTable: Table, rightTable: Table): ColumnarBatch = { - val joinedTable = joinType match { - case LeftOuter => leftTable.onColumns(joinKeyIndices: _*) - .leftJoin(rightTable.onColumns(joinKeyIndices: _*), false) - case RightOuter => rightTable.onColumns(joinKeyIndices: _*) - .leftJoin(leftTable.onColumns(joinKeyIndices: _*), false) - case _: InnerLike => leftTable.onColumns(joinKeyIndices: _*) - .innerJoin(rightTable.onColumns(joinKeyIndices: _*), false) - case LeftSemi => leftTable.onColumns(joinKeyIndices: _*) - .leftSemiJoin(rightTable.onColumns(joinKeyIndices: _*), false) - case LeftAnti => leftTable.onColumns(joinKeyIndices: _*) - .leftAntiJoin(rightTable.onColumns(joinKeyIndices: _*), false) - case FullOuter => leftTable.onColumns(joinKeyIndices: _*) - .fullJoin(rightTable.onColumns(joinKeyIndices: _*), false) - case _ => throw new NotImplementedError(s"Joint Type ${joinType.getClass} is not currently" + - s" supported") - } - try { - val result = joinIndices.zip(output).map { case (joinIndex, outAttr) => - GpuColumnVector.from(joinedTable.getColumn(joinIndex).incRefCount(), outAttr.dataType) - }.toArray[ColumnVector] - - new ColumnarBatch(result, joinedTable.getRowCount.toInt) - } finally { - joinedTable.close() - } - } -} diff --git a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuShuffledHashJoinExec.scala b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuShuffledHashJoinExec.scala index 54cae386fc9..3ea16da478f 100644 --- a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuShuffledHashJoinExec.scala +++ b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuShuffledHashJoinExec.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{Distribution, HashClustered import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.sql.rapids.execution.GpuHashJoin import org.apache.spark.sql.types.DataType import org.apache.spark.sql.vectorized.ColumnarBatch @@ -73,7 +74,7 @@ class GpuShuffledHashJoinMeta( leftKeys.map(_.convertToGpu()), rightKeys.map(_.convertToGpu()), join.joinType, - join.buildSide, + GpuJoinUtils.getGpuBuildSide(join.buildSide), condition.map(_.convertToGpu()), childPlans(0).convertIfNeeded(), childPlans(1).convertIfNeeded()) @@ -83,7 +84,7 @@ case class GpuShuffledHashJoinExec( leftKeys: Seq[Expression], rightKeys: Seq[Expression], joinType: JoinType, - buildSide: BuildSide, + buildSide: GpuBuildSide, condition: Option[Expression], left: SparkPlan, right: SparkPlan) extends BinaryExecNode with GpuHashJoin { @@ -105,8 +106,8 @@ case class GpuShuffledHashJoinExec( } override def childrenCoalesceGoal: Seq[CoalesceGoal] = buildSide match { - case BuildLeft => Seq(RequireSingleBatch, null) - case BuildRight => Seq(null, RequireSingleBatch) + case GpuBuildLeft => Seq(RequireSingleBatch, null) + case GpuBuildRight => Seq(null, RequireSingleBatch) } override def doExecuteColumnar() : RDD[ColumnarBatch] = { diff --git a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuSortMergeJoinExec.scala b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuSortMergeJoinExec.scala index 20b47c7ad48..a2e73344875 100644 --- a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuSortMergeJoinExec.scala +++ b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/GpuSortMergeJoinExec.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, InnerLike, JoinType, LeftAnti, LeftOuter, LeftSemi, RightOuter} import org.apache.spark.sql.execution.SortExec import org.apache.spark.sql.execution.joins.SortMergeJoinExec +import org.apache.spark.sql.rapids.execution.GpuHashJoin import org.apache.spark.sql.types.DataType /** @@ -86,7 +87,7 @@ class GpuSortMergeJoinMeta( leftKeys.map(_.convertToGpu()), rightKeys.map(_.convertToGpu()), join.joinType, - buildSide, + GpuJoinUtils.getGpuBuildSide(buildSide), condition.map(_.convertToGpu()), childPlans(0).convertIfNeeded(), childPlans(1).convertIfNeeded()) diff --git a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/Spark301dbShims.scala b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/Spark301dbShims.scala index b906d25d4ab..5b264efa871 100644 --- a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/Spark301dbShims.scala +++ b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/Spark301dbShims.scala @@ -60,13 +60,6 @@ class Spark301dbShims extends Spark301Shims { GpuBroadcastExchangeExec(mode, child) } - override def isGpuHashJoin(plan: SparkPlan): Boolean = { - plan match { - case _: GpuHashJoin => true - case p => false - } - } - override def isGpuBroadcastHashJoin(plan: SparkPlan): Boolean = { plan match { case _: GpuBroadcastHashJoinExec => true diff --git a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuBroadcastHashJoinExec.scala b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuBroadcastHashJoinExec.scala index 262f15a7384..9a8ae7766e6 100644 --- a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuBroadcastHashJoinExec.scala +++ b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuBroadcastHashJoinExec.scala @@ -23,7 +23,7 @@ import com.nvidia.spark.rapids.shims.spark301.GpuBroadcastExchangeExec import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Expression, SortOrder} -import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide} +import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.physical.{BroadcastDistribution, Distribution, UnspecifiedDistribution} import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} @@ -31,7 +31,7 @@ import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashedRelationBroadcastMode} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} -import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExecBase, SerializeConcatHostBuffersDeserializeBatch} +import org.apache.spark.sql.rapids.execution.{GpuHashJoin, SerializeConcatHostBuffersDeserializeBatch} import org.apache.spark.sql.types.DataType import org.apache.spark.sql.vectorized.ColumnarBatch @@ -88,7 +88,8 @@ class GpuBroadcastHashJoinMeta( GpuBroadcastHashJoinExec( leftKeys.map(_.convertToGpu()), rightKeys.map(_.convertToGpu()), - join.joinType, join.buildSide, + join.joinType, + GpuJoinUtils.getGpuBuildSide(join.buildSide), condition.map(_.convertToGpu()), left, right) } @@ -98,7 +99,7 @@ case class GpuBroadcastHashJoinExec( leftKeys: Seq[Expression], rightKeys: Seq[Expression], joinType: JoinType, - buildSide: BuildSide, + buildSide: GpuBuildSide, condition: Option[Expression], left: SparkPlan, right: SparkPlan) extends BinaryExecNode with GpuHashJoin { @@ -109,16 +110,12 @@ case class GpuBroadcastHashJoinExec( "joinTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "join time"), "filterTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "filter time")) - // Spark 3.1.0 started introducing ordering guarantees for hash joins but GPU - // hash joins do not provide ordering guarantees - override def outputOrdering: Seq[SortOrder] = Nil - override def requiredChildDistribution: Seq[Distribution] = { val mode = HashedRelationBroadcastMode(buildKeys) buildSide match { - case BuildLeft => + case GpuBuildLeft => BroadcastDistribution(mode) :: UnspecifiedDistribution :: Nil - case BuildRight => + case GpuBuildRight => UnspecifiedDistribution :: BroadcastDistribution(mode) :: Nil } } @@ -149,10 +146,15 @@ case class GpuBroadcastHashJoinExec( val boundCondition = condition.map(GpuBindReferences.bindReference(_, output)) lazy val builtTable = { - // TODO clean up intermediate results... - val keys = GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys) - val combined = combine(keys, broadcastRelation.value.batch) - val ret = GpuColumnVector.from(combined) + val ret = withResource( + GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys => + val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch)) + val filtered = filterBuiltTableIfNeeded(combined) + withResource(filtered) { filtered => + GpuColumnVector.from(filtered) + } + } + // Don't warn for a leak, because we cannot control when we are done with this (0 until ret.getNumberOfColumns).foreach(ret.getColumn(_).noWarnLeakExpected()) ret diff --git a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuHashJoin.scala b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuHashJoin.scala deleted file mode 100644 index f306260eee4..00000000000 --- a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuHashJoin.scala +++ /dev/null @@ -1,328 +0,0 @@ -/* - * Copyright (c) 2019-2020, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.nvidia.spark.rapids.shims.spark310 - -import ai.rapids.cudf.{NvtxColor, Table} -import com.nvidia.spark.rapids._ - -import org.apache.spark.TaskContext -import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} -import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} -import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, InnerLike, JoinType, LeftAnti, LeftExistence, LeftOuter, LeftSemi, RightOuter} -import org.apache.spark.sql.execution.joins.HashJoinWithoutCodegen -import org.apache.spark.sql.execution.metric.SQLMetric -import org.apache.spark.sql.rapids.GpuAnd -import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} - -object GpuHashJoin { - def tagJoin( - meta: RapidsMeta[_, _, _], - joinType: JoinType, - leftKeys: Seq[Expression], - rightKeys: Seq[Expression], - condition: Option[Expression]): Unit = joinType match { - case _: InnerLike => - case FullOuter | RightOuter | LeftOuter | LeftSemi | LeftAnti => - if (condition.isDefined) { - meta.willNotWorkOnGpu(s"$joinType joins currently do not support conditions") - } - case _ => meta.willNotWorkOnGpu(s"$joinType currently is not supported") - } - - def incRefCount(cb: ColumnarBatch): ColumnarBatch = { - GpuColumnVector.extractBases(cb).foreach(_.incRefCount()) - cb - } -} - -trait GpuHashJoin extends GpuExec with HashJoinWithoutCodegen { - - override def output: Seq[Attribute] = { - joinType match { - case _: InnerLike => - left.output ++ right.output - case LeftOuter => - left.output ++ right.output.map(_.withNullability(true)) - case RightOuter => - left.output.map(_.withNullability(true)) ++ right.output - case j: ExistenceJoin => - left.output :+ j.exists - case LeftExistence(_) => - left.output - case FullOuter => - left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true)) - case x => - throw new IllegalArgumentException(s"GpuHashJoin should not take $x as the JoinType") - } - } - - // If we have a single batch streamed in then we will produce a single batch of output - // otherwise it can get smaller or bigger, we just don't know. When we support out of - // core joins this will change - override def outputBatching: CoalesceGoal = { - val batching = buildSide match { - case BuildLeft => GpuExec.outputBatching(right) - case BuildRight => GpuExec.outputBatching(left) - } - if (batching == RequireSingleBatch) { - RequireSingleBatch - } else { - null - } - } - - protected lazy val (gpuBuildKeys, gpuStreamedKeys) = { - require(leftKeys.map(_.dataType) == rightKeys.map(_.dataType), - "Join keys from two sides should have same types") - val lkeys = GpuBindReferences.bindGpuReferences(leftKeys, left.output) - val rkeys = GpuBindReferences.bindGpuReferences(rightKeys, right.output) - buildSide match { - case BuildLeft => (lkeys, rkeys) - case BuildRight => (rkeys, lkeys) - } - } - - /** - * Place the columns in left and the columns in right into a single ColumnarBatch - */ - def combine(left: ColumnarBatch, right: ColumnarBatch): ColumnarBatch = { - val l = GpuColumnVector.extractColumns(left) - val r = GpuColumnVector.extractColumns(right) - val c = l ++ r - new ColumnarBatch(c.asInstanceOf[Array[ColumnVector]], left.numRows()) - } - - // TODO eventually dedupe the keys - lazy val joinKeyIndices: Range = gpuBuildKeys.indices - - val localBuildOutput: Seq[Attribute] = buildPlan.output - // The first columns are the ones we joined on and need to remove - lazy val joinIndices: Seq[Int] = joinType match { - case RightOuter => - // The left table and right table are switched in the output - // because we don't support a right join, only left - val numRight = right.output.length - val numLeft = left.output.length - val joinLength = joinKeyIndices.length - def remap(index: Int): Int = { - if (index < numLeft) { - // part of the left table, but is on the right side of the tmp output - index + joinLength + numRight - } else { - // part of the right table, but is on the left side of the tmp output - index + joinLength - numLeft - } - } - output.indices.map (remap) - case _ => - val joinLength = joinKeyIndices.length - output.indices.map (v => v + joinLength) - } - - // Spark adds in rules to filter out nulls for some types of joins, but it does not - // guarantee 100% that all nulls will be filtered out by the time they get to - // this point, but because of https://github.com/rapidsai/cudf/issues/6052 - // we need to filter out the nulls ourselves until it is fixed. - // InnerLike | LeftSemi => - // filter left and right keys - // RightOuter => - // filter left keys - // LeftOuter | LeftAnti => - // filter right keys - - private[this] lazy val shouldFilterBuiltTableForNulls: Boolean = { - val builtAnyNullable = gpuBuildKeys.exists(_.nullable) - (joinType, buildSide) match { - case (_: InnerLike | LeftSemi, _) => builtAnyNullable - case (RightOuter, BuildLeft) => builtAnyNullable - case (LeftOuter | LeftAnti, BuildRight) => builtAnyNullable - case _ => false - } - } - - private[this] lazy val shouldFilterStreamTableForNulls: Boolean = { - val streamedAnyNullable = gpuStreamedKeys.exists(_.nullable) - (joinType, buildSide) match { - case (_: InnerLike | LeftSemi, _) => streamedAnyNullable - case (RightOuter, BuildRight) => streamedAnyNullable - case (LeftOuter | LeftAnti, BuildLeft) => streamedAnyNullable - case _ => false - } - } - - private[this] def mkNullFilterExpr(exprs: Seq[GpuExpression]): GpuExpression = - exprs.zipWithIndex.map { kv => - GpuIsNotNull(GpuBoundReference(kv._2, kv._1.dataType, kv._1.nullable)) - }.reduce(GpuAnd) - - private[this] lazy val builtTableNullFilterExpression: GpuExpression = - mkNullFilterExpr(gpuBuildKeys) - - private[this] lazy val streamedTableNullFilterExpression: GpuExpression = - mkNullFilterExpr(gpuStreamedKeys) - - /** - * Filter the builtBatch if needed. builtBatch will be closed. - */ - def filterBuiltTableIfNeeded(builtBatch: ColumnarBatch): ColumnarBatch = - if (shouldFilterBuiltTableForNulls) { - GpuFilter(builtBatch, builtTableNullFilterExpression) - } else { - builtBatch - } - - private[this] def filterStreamedTableIfNeeded(streamedBatch:ColumnarBatch): ColumnarBatch = - if (shouldFilterStreamTableForNulls) { - GpuFilter(streamedBatch, streamedTableNullFilterExpression) - } else { - streamedBatch - } - - def doJoin(builtTable: Table, - stream: Iterator[ColumnarBatch], - boundCondition: Option[Expression], - numOutputRows: SQLMetric, - joinOutputRows: SQLMetric, - numOutputBatches: SQLMetric, - streamTime: SQLMetric, - joinTime: SQLMetric, - filterTime: SQLMetric, - totalTime: SQLMetric): Iterator[ColumnarBatch] = { - new Iterator[ColumnarBatch] { - import scala.collection.JavaConverters._ - var nextCb: Option[ColumnarBatch] = None - var first: Boolean = true - - TaskContext.get().addTaskCompletionListener[Unit](_ => closeCb()) - - def closeCb(): Unit = { - nextCb.foreach(_.close()) - nextCb = None - } - - override def hasNext: Boolean = { - var mayContinue = true - while (nextCb.isEmpty && mayContinue) { - val startTime = System.nanoTime() - if (stream.hasNext) { - val cb = stream.next() - streamTime += (System.nanoTime() - startTime) - nextCb = doJoin(builtTable, cb, boundCondition, joinOutputRows, numOutputRows, - numOutputBatches, joinTime, filterTime) - totalTime += (System.nanoTime() - startTime) - } else if (first) { - // We have to at least try one in some cases - val cb = GpuColumnVector.emptyBatch(streamedPlan.output.asJava) - streamTime += (System.nanoTime() - startTime) - nextCb = doJoin(builtTable, cb, boundCondition, joinOutputRows, numOutputRows, - numOutputBatches, joinTime, filterTime) - totalTime += (System.nanoTime() - startTime) - } else { - mayContinue = false - } - first = false - } - nextCb.isDefined - } - - override def next(): ColumnarBatch = { - if (!hasNext) { - throw new NoSuchElementException() - } - val ret = nextCb.get - nextCb = None - ret - } - } - } - - private[this] def doJoin(builtTable: Table, - streamedBatch: ColumnarBatch, - boundCondition: Option[Expression], - numOutputRows: SQLMetric, - numJoinOutputRows: SQLMetric, - numOutputBatches: SQLMetric, - joinTime: SQLMetric, - filterTime: SQLMetric): Option[ColumnarBatch] = { - - val combined = withResource(streamedBatch) { streamedBatch => - withResource(GpuProjectExec.project(streamedBatch, gpuStreamedKeys)) { - streamedKeysBatch => - GpuHashJoin.incRefCount(combine(streamedKeysBatch, streamedBatch)) - } - } - val streamedTable = withResource(filterStreamedTableIfNeeded(combined)) { cb => - GpuColumnVector.from(cb) - } - - val nvtxRange = new NvtxWithMetrics("hash join", NvtxColor.ORANGE, joinTime) - val joined = try { - buildSide match { - case BuildLeft => doJoinLeftRight(builtTable, streamedTable) - case BuildRight => doJoinLeftRight(streamedTable, builtTable) - } - } finally { - streamedTable.close() - nvtxRange.close() - } - - numJoinOutputRows += joined.numRows() - - val tmp = if (boundCondition.isDefined) { - GpuFilter(joined, boundCondition.get, numOutputRows, numOutputBatches, filterTime) - } else { - numOutputRows += joined.numRows() - numOutputBatches += 1 - joined - } - if (tmp.numRows() == 0) { - // Not sure if there is a better way to work around this - numOutputBatches.set(numOutputBatches.value - 1) - tmp.close() - None - } else { - Some(tmp) - } - } - - private[this] def doJoinLeftRight(leftTable: Table, rightTable: Table): ColumnarBatch = { - val joinedTable = joinType match { - case LeftOuter => leftTable.onColumns(joinKeyIndices: _*) - .leftJoin(rightTable.onColumns(joinKeyIndices: _*), false) - case RightOuter => rightTable.onColumns(joinKeyIndices: _*) - .leftJoin(leftTable.onColumns(joinKeyIndices: _*), false) - case _: InnerLike => leftTable.onColumns(joinKeyIndices: _*) - .innerJoin(rightTable.onColumns(joinKeyIndices: _*), false) - case LeftSemi => leftTable.onColumns(joinKeyIndices: _*) - .leftSemiJoin(rightTable.onColumns(joinKeyIndices: _*), false) - case LeftAnti => leftTable.onColumns(joinKeyIndices: _*) - .leftAntiJoin(rightTable.onColumns(joinKeyIndices: _*), false) - case FullOuter => leftTable.onColumns(joinKeyIndices: _*) - .fullJoin(rightTable.onColumns(joinKeyIndices: _*), false) - case _ => throw new NotImplementedError(s"Joint Type ${joinType.getClass} is not currently" + - s" supported") - } - try { - val result = joinIndices.zip(output).map { case (joinIndex, outAttr) => - GpuColumnVector.from(joinedTable.getColumn(joinIndex).incRefCount(), outAttr.dataType) - }.toArray[ColumnVector] - - new ColumnarBatch(result, joinedTable.getRowCount.toInt) - } finally { - joinedTable.close() - } - } -} diff --git a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuShuffledHashJoinExec.scala b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuShuffledHashJoinExec.scala index 9b25b708f89..065cb0751f7 100644 --- a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuShuffledHashJoinExec.scala +++ b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuShuffledHashJoinExec.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{Distribution, HashClustered import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} -import org.apache.spark.sql.rapids.execution.GpuShuffledHashJoinBase +import org.apache.spark.sql.rapids.execution.{GpuHashJoin, GpuShuffledHashJoinBase} import org.apache.spark.sql.types.DataType import org.apache.spark.sql.vectorized.ColumnarBatch @@ -74,7 +74,7 @@ class GpuShuffledHashJoinMeta( leftKeys.map(_.convertToGpu()), rightKeys.map(_.convertToGpu()), join.joinType, - join.buildSide, + GpuJoinUtils.getGpuBuildSide(join.buildSide), condition.map(_.convertToGpu()), childPlans(0).convertIfNeeded(), childPlans(1).convertIfNeeded(), @@ -85,7 +85,7 @@ case class GpuShuffledHashJoinExec( leftKeys: Seq[Expression], rightKeys: Seq[Expression], joinType: JoinType, - buildSide: BuildSide, + buildSide: GpuBuildSide, condition: Option[Expression], left: SparkPlan, right: SparkPlan, @@ -108,10 +108,6 @@ case class GpuShuffledHashJoinExec( "joinOutputRows" -> SQLMetrics.createMetric(sparkContext, "join output rows"), "filterTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "filter time")) - // Spark 3.1.0 started introducing ordering guarantees for hash joins but GPU - // hash joins do not provide ordering guarantees - override def outputOrdering: Seq[SortOrder] = Nil - override def requiredChildDistribution: Seq[Distribution] = HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil @@ -121,8 +117,8 @@ case class GpuShuffledHashJoinExec( } override def childrenCoalesceGoal: Seq[CoalesceGoal] = buildSide match { - case BuildLeft => Seq(RequireSingleBatch, null) - case BuildRight => Seq(null, RequireSingleBatch) + case GpuBuildLeft => Seq(RequireSingleBatch, null) + case GpuBuildRight => Seq(null, RequireSingleBatch) } override def doExecuteColumnar() : RDD[ColumnarBatch] = { diff --git a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuSortMergeJoinExec.scala b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuSortMergeJoinExec.scala index 9ebf5fc4447..2a78793c833 100644 --- a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuSortMergeJoinExec.scala +++ b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuSortMergeJoinExec.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, InnerLike, JoinType, LeftAnti, LeftOuter, LeftSemi, RightOuter} import org.apache.spark.sql.execution.SortExec import org.apache.spark.sql.execution.joins.SortMergeJoinExec +import org.apache.spark.sql.rapids.execution.GpuHashJoin import org.apache.spark.sql.types.DataType /** @@ -86,7 +87,7 @@ class GpuSortMergeJoinMeta( leftKeys.map(_.convertToGpu()), rightKeys.map(_.convertToGpu()), join.joinType, - buildSide, + GpuJoinUtils.getGpuBuildSide(buildSide), condition.map(_.convertToGpu()), childPlans(0).convertIfNeeded(), childPlans(1).convertIfNeeded(), diff --git a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/Spark310Shims.scala b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/Spark310Shims.scala index 274dd0b5c54..f372e0cb89a 100644 --- a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/Spark310Shims.scala +++ b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/Spark310Shims.scala @@ -78,13 +78,6 @@ class Spark310Shims extends Spark301Shims { GpuBroadcastNestedLoopJoinExec(left, right, join, joinType, condition, targetSizeBytes) } - override def isGpuHashJoin(plan: SparkPlan): Boolean = { - plan match { - case _: GpuHashJoin => true - case _ => false - } - } - override def isGpuBroadcastHashJoin(plan: SparkPlan): Boolean = { plan match { case _: GpuBroadcastHashJoinExec => true diff --git a/shims/spark310/src/main/scala/org/apache/spark/sql/execution/joins/HashJoinWithoutCodegen.scala b/shims/spark310/src/main/scala/org/apache/spark/sql/execution/joins/HashJoinWithoutCodegen.scala deleted file mode 100644 index 7a3b97755e6..00000000000 --- a/shims/spark310/src/main/scala/org/apache/spark/sql/execution/joins/HashJoinWithoutCodegen.scala +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright (c) 2020, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.execution.joins - -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext - -/** - * This trait is used to implement a hash join that does not support codegen. - * It is in the org.apache.spark.sql.execution.joins package to access the - * `HashedRelationInfo` class which is private to that package but needed by - * any class implementing `HashJoin`. - */ -trait HashJoinWithoutCodegen extends HashJoin { - override def supportCodegen: Boolean = false - - override def inputRDDs(): Seq[RDD[InternalRow]] = { - throw new UnsupportedOperationException("inputRDDs is used by codegen which we don't support") - } - - protected override def prepareRelation(ctx: CodegenContext): HashedRelationInfo = { - throw new UnsupportedOperationException( - "prepareRelation is used by codegen which is not supported for this join") - } -} diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala index 985fdccdae6..142ac5bbcc1 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec} import org.apache.spark.sql.rapids.{GpuDataSourceScanExec, GpuFileSourceScanExec, GpuInputFileBlockLength, GpuInputFileBlockStart, GpuInputFileName, GpuShuffleEnv} -import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExecBase, GpuCustomShuffleReaderExec, GpuShuffleExchangeExecBase} +import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExecBase, GpuCustomShuffleReaderExec, GpuHashJoin, GpuShuffleExchangeExecBase} /** * Rules that run after the row to columnar and columnar to row transitions have been inserted. @@ -350,7 +350,7 @@ class GpuTransitionOverrides extends Rule[SparkPlan] { // intermediate nodes that have a specified sort order. This helps with the size of // Parquet and Orc files plan match { - case s if ShimLoader.getSparkShims.isGpuHashJoin(s) => + case _: GpuHashJoin => val sortOrder = getOptimizedSortOrder(plan) GpuSortExec(sortOrder, false, plan, TargetSize(conf.gpuTargetBatchSizeBytes)) case _: GpuHashAggregateExec => diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala index cd93da54081..7d468b4f8df 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala @@ -62,7 +62,6 @@ case class EMRShimVersion(major: Int, minor: Int, patch: Int) extends ShimVersio trait SparkShims { def getSparkShimVersion: ShimVersion - def isGpuHashJoin(plan: SparkPlan): Boolean def isGpuBroadcastHashJoin(plan: SparkPlan): Boolean def isGpuShuffledHashJoin(plan: SparkPlan): Boolean def isBroadcastExchangeLike(plan: SparkPlan): Boolean diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuHashJoin.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala similarity index 85% rename from shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuHashJoin.scala rename to sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala index 5ff7f2f27e9..e7697628cd2 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuHashJoin.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2020, NVIDIA CORPORATION. + * Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,15 +13,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.nvidia.spark.rapids.shims.spark300 +package org.apache.spark.sql.rapids.execution import ai.rapids.cudf.{NvtxColor, Table} -import com.nvidia.spark.rapids._ +import com.nvidia.spark.rapids.{CoalesceGoal, GpuBindReferences, GpuBoundReference, GpuBuildLeft, GpuBuildRight, GpuBuildSide, GpuColumnVector, GpuExec, GpuExpression, GpuFilter, GpuIsNotNull, GpuProjectExec, NvtxWithMetrics, RapidsMeta, RequireSingleBatch} import org.apache.spark.TaskContext import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, InnerLike, JoinType, LeftAnti, LeftExistence, LeftOuter, LeftSemi, RightOuter} -import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, HashJoin} +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.rapids.GpuAnd import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} @@ -47,7 +47,31 @@ object GpuHashJoin { } } -trait GpuHashJoin extends GpuExec with HashJoin { +trait GpuHashJoin extends GpuExec { + def left: SparkPlan + def right: SparkPlan + def joinType: JoinType + def condition: Option[Expression] + def leftKeys: Seq[Expression] + def rightKeys: Seq[Expression] + def buildSide: GpuBuildSide + + protected lazy val (buildPlan, streamedPlan) = buildSide match { + case GpuBuildLeft => (left, right) + case GpuBuildRight => (right, left) + } + + protected lazy val (buildKeys, streamedKeys) = { + require(leftKeys.length == rightKeys.length && + leftKeys.map(_.dataType) + .zip(rightKeys.map(_.dataType)) + .forall(types => types._1.sameType(types._2)), + "Join keys from two sides should have same length and types") + buildSide match { + case GpuBuildLeft => (leftKeys, rightKeys) + case GpuBuildRight => (rightKeys, leftKeys) + } + } override def output: Seq[Attribute] = { joinType match { @@ -73,8 +97,8 @@ trait GpuHashJoin extends GpuExec with HashJoin { // core joins this will change override def outputBatching: CoalesceGoal = { val batching = buildSide match { - case BuildLeft => GpuExec.outputBatching(right) - case BuildRight => GpuExec.outputBatching(left) + case GpuBuildLeft => GpuExec.outputBatching(right) + case GpuBuildRight => GpuExec.outputBatching(left) } if (batching == RequireSingleBatch) { RequireSingleBatch @@ -89,8 +113,8 @@ trait GpuHashJoin extends GpuExec with HashJoin { val lkeys = GpuBindReferences.bindGpuReferences(leftKeys, left.output) val rkeys = GpuBindReferences.bindGpuReferences(rightKeys, right.output) buildSide match { - case BuildLeft => (lkeys, rkeys) - case BuildRight => (rkeys, lkeys) + case GpuBuildLeft => (lkeys, rkeys) + case GpuBuildRight => (rkeys, lkeys) } } @@ -146,8 +170,8 @@ trait GpuHashJoin extends GpuExec with HashJoin { val builtAnyNullable = gpuBuildKeys.exists(_.nullable) (joinType, buildSide) match { case (_: InnerLike | LeftSemi, _) => builtAnyNullable - case (RightOuter, BuildLeft) => builtAnyNullable - case (LeftOuter | LeftAnti, BuildRight) => builtAnyNullable + case (RightOuter, GpuBuildLeft) => builtAnyNullable + case (LeftOuter | LeftAnti, GpuBuildRight) => builtAnyNullable case _ => false } } @@ -156,8 +180,8 @@ trait GpuHashJoin extends GpuExec with HashJoin { val streamedAnyNullable = gpuStreamedKeys.exists(_.nullable) (joinType, buildSide) match { case (_: InnerLike | LeftSemi, _) => streamedAnyNullable - case (RightOuter, BuildRight) => streamedAnyNullable - case (LeftOuter | LeftAnti, BuildLeft) => streamedAnyNullable + case (RightOuter, GpuBuildRight) => streamedAnyNullable + case (LeftOuter | LeftAnti, GpuBuildLeft) => streamedAnyNullable case _ => false } } @@ -270,8 +294,8 @@ trait GpuHashJoin extends GpuExec with HashJoin { val nvtxRange = new NvtxWithMetrics("hash join", NvtxColor.ORANGE, joinTime) val joined = try { buildSide match { - case BuildLeft => doJoinLeftRight(builtTable, streamedTable) - case BuildRight => doJoinLeftRight(streamedTable, builtTable) + case GpuBuildLeft => doJoinLeftRight(builtTable, streamedTable) + case GpuBuildRight => doJoinLeftRight(streamedTable, builtTable) } } finally { streamedTable.close() @@ -324,4 +348,5 @@ trait GpuHashJoin extends GpuExec with HashJoin { joinedTable.close() } } + } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/BroadcastHashJoinSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/BroadcastHashJoinSuite.scala index 27038eeeb24..b661d36ea17 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/BroadcastHashJoinSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/BroadcastHashJoinSuite.scala @@ -19,8 +19,8 @@ package com.nvidia.spark.rapids import com.nvidia.spark.rapids.TestUtils.{findOperator, findOperators} import org.apache.spark.SparkConf -import org.apache.spark.sql.execution.joins.HashJoin import org.apache.spark.sql.functions.broadcast +import org.apache.spark.sql.rapids.execution.GpuHashJoin class BroadcastHashJoinSuite extends SparkQueryCompareTestSuite { @@ -61,15 +61,13 @@ class BroadcastHashJoinSuite extends SparkQueryCompareTestSuite { plan1.collect() val finalPlan1 = findOperator(plan1.queryExecution.executedPlan, ShimLoader.getSparkShims.isGpuBroadcastHashJoin) - assert(ShimLoader.getSparkShims.getBuildSide - (finalPlan1.get.asInstanceOf[HashJoin]).toString == "GpuBuildLeft") + assert(finalPlan1.get.asInstanceOf[GpuHashJoin].buildSide == GpuBuildLeft) // execute the plan so that the final adaptive plan is available when AQE is on plan2.collect() val finalPlan2 = findOperator(plan2.queryExecution.executedPlan, ShimLoader.getSparkShims.isGpuBroadcastHashJoin) - assert(ShimLoader.getSparkShims. - getBuildSide(finalPlan2.get.asInstanceOf[HashJoin]).toString == "GpuBuildRight") + assert(finalPlan2.get.asInstanceOf[GpuHashJoin].buildSide == GpuBuildRight) } }) }