From 87d3d14338b4f029439bcd842bc483b688d9d04a Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Fri, 11 Sep 2020 14:53:45 -0500 Subject: [PATCH 1/2] Revert "Filter nulls from joins where possible to improve performance. (#594)" This reverts commit df00904433514930a2dbbdd8ae3ec9043dc3dc3d. --- .../src/main/python/tpcds_test.py | 16 +---- .../spark300/GpuBroadcastHashJoinExec.scala | 13 ++-- .../rapids/shims/spark300/GpuHashJoin.scala | 69 +------------------ .../spark300/GpuShuffledHashJoinExec.scala | 26 +++---- .../spark300db/GpuBroadcastHashJoinExec.scala | 13 ++-- .../rapids/shims/spark300db/GpuHashJoin.scala | 64 +---------------- .../spark300db/GpuShuffledHashJoinExec.scala | 26 +++---- .../spark301/GpuBroadcastHashJoinExec.scala | 13 ++-- .../rapids/shims/spark310/GpuHashJoin.scala | 64 +---------------- .../spark310/GpuShuffledHashJoinExec.scala | 26 +++---- .../spark/rapids/basicPhysicalOperators.scala | 34 +++++---- 11 files changed, 72 insertions(+), 292 deletions(-) diff --git a/integration_tests/src/main/python/tpcds_test.py b/integration_tests/src/main/python/tpcds_test.py index 5b8780af375..cc91b42bf5d 100644 --- a/integration_tests/src/main/python/tpcds_test.py +++ b/integration_tests/src/main/python/tpcds_test.py @@ -23,8 +23,8 @@ 'q30', 'q31', 'q32', 'q33', 'q34', 'q35', 'q36', 'q37', 'q38', 'q39a', 'q39b', 'q40', 'q41', 'q42', 'q43', 'q44', 'q45', 'q46', 'q47', 'q48', 'q49', 'q50', 'q51', 'q52', 'q53', 'q54', 'q55', 'q56', 'q57', 'q58', 'q59', - 'q60', 'q61', 'q62', 'q63', 'q64', 'q65', 'q66', 'q68', 'q69', - 'q71', 'q72', 'q73', 'q74', 'q75', 'q76', 'q77', 'q78', 'q79', + 'q60', 'q61', 'q62', 'q63', 'q64', 'q65', 'q66', 'q67', 'q68', 'q69', + 'q70', 'q71', 'q72', 'q73', 'q74', 'q75', 'q76', 'q77', 'q78', 'q79', 'q80', 'q81', 'q82', 'q83', 'q84', 'q85', 'q86', 'q87', 'q88', 'q89', 'q90', 'q91', 'q92', 'q93', 'q94', 'q95', 'q96', 'q97', 'q98', 'q99', 'ss_max', 'ss_maxb'] @@ -35,17 +35,5 @@ @allow_non_gpu(any=True) @pytest.mark.parametrize('query', queries) def test_tpcds(tpcds, query): - assert_gpu_and_cpu_are_equal_collect( - lambda spark : tpcds.do_test_query(query), - conf={'spark.rapids.sql.variableFloatAgg.enabled': 'true'}) - -no_var_agg_queries = ['q67', 'q70'] - -@incompat -@ignore_order -@approximate_float -@allow_non_gpu(any=True) -@pytest.mark.parametrize('query', no_var_agg_queries) -def test_tpcds_no_var_agg(tpcds, query): assert_gpu_and_cpu_are_equal_collect( lambda spark : tpcds.do_test_query(query)) diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuBroadcastHashJoinExec.scala b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuBroadcastHashJoinExec.scala index 2293d9471a3..610b408d418 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuBroadcastHashJoinExec.scala +++ b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuBroadcastHashJoinExec.scala @@ -137,15 +137,10 @@ case class GpuBroadcastHashJoinExec( val boundCondition = condition.map(GpuBindReferences.bindReference(_, output)) lazy val builtTable = { - val ret = withResource( - GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys => - val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch)) - val filtered = filterBuiltTableIfNeeded(combined) - withResource(filtered) { filtered => - GpuColumnVector.from(filtered) - } - } - + // TODO clean up intermediate results... + val keys = GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys) + val combined = combine(keys, broadcastRelation.value.batch) + val ret = GpuColumnVector.from(combined) // Don't warn for a leak, because we cannot control when we are done with this (0 until ret.getNumberOfColumns).foreach(ret.getColumn(_).noWarnLeakExpected()) ret diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuHashJoin.scala b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuHashJoin.scala index 1b2686b6334..b3eb7a39fa4 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuHashJoin.scala +++ b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuHashJoin.scala @@ -23,7 +23,6 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, InnerLike, JoinType, LeftAnti, LeftExistence, LeftOuter, LeftSemi, RightOuter} import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, HashJoin} import org.apache.spark.sql.execution.metric.SQLMetric -import org.apache.spark.sql.rapids.GpuAnd import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} object GpuHashJoin { @@ -40,11 +39,6 @@ object GpuHashJoin { } case _ => meta.willNotWorkOnGpu(s"$joinType currently is not supported") } - - def incRefCount(cb: ColumnarBatch): ColumnarBatch = { - GpuColumnVector.extractBases(cb).foreach(_.incRefCount()) - cb - } } trait GpuHashJoin extends GpuExec with HashJoin { @@ -116,63 +110,6 @@ trait GpuHashJoin extends GpuExec with HashJoin { output.indices.map (v => v + joinLength) } - // Spark adds in rules to filter out nulls for some types of joins, but it does not - // guarantee 100% that all nulls will be filtered out by the time they get to - // this point, but because of https://github.com/rapidsai/cudf/issues/6052 - // we need to filter out the nulls ourselves until it is fixed. - // InnerLike | LeftSemi => - // filter left and right keys - // RightOuter => - // filter left keys - // LeftOuter | LeftAnti => - // filter right keys - - private[this] lazy val shouldFilterBuiltTableForNulls: Boolean = { - val builtAnyNullable = gpuBuildKeys.exists(_.nullable) - (joinType, buildSide) match { - case (_: InnerLike | LeftSemi, _) => builtAnyNullable - case (RightOuter, BuildLeft) => builtAnyNullable - case (LeftOuter | LeftAnti, BuildRight) => builtAnyNullable - case _ => false - } - } - - private[this] lazy val shouldFilterStreamTableForNulls: Boolean = { - val streamedAnyNullable = gpuStreamedKeys.exists(_.nullable) - (joinType, buildSide) match { - case (_: InnerLike | LeftSemi, _) => streamedAnyNullable - case (RightOuter, BuildRight) => streamedAnyNullable - case (LeftOuter | LeftAnti, BuildLeft) => streamedAnyNullable - case _ => false - } - } - - private[this] def mkNullFilterExpr(exprs: Seq[GpuExpression]): GpuExpression = - exprs.zipWithIndex.map { kv => - GpuIsNotNull(GpuBoundReference(kv._2, kv._1.dataType, kv._1.nullable)) - }.reduce(GpuAnd) - - private[this] lazy val builtTableNullFilterExpression: GpuExpression = - mkNullFilterExpr(gpuBuildKeys) - - private[this] lazy val streamedTableNullFilterExpression: GpuExpression = - mkNullFilterExpr(gpuStreamedKeys) - - /** - * Filter the builtBatch if needed. builtBatch will be closed. - * @param builtBatch - * @return - */ - def filterBuiltTableIfNeeded(builtBatch: ColumnarBatch): ColumnarBatch = - if (shouldFilterBuiltTableForNulls) { - GpuFilter(builtBatch, builtTableNullFilterExpression) - } else { - builtBatch - } - - private[this] def filterStreamedTable(streamedBatch:ColumnarBatch): ColumnarBatch = - GpuFilter(streamedBatch, streamedTableNullFilterExpression) - def doJoin(builtTable: Table, stream: Iterator[ColumnarBatch], boundCondition: Option[Expression], @@ -197,11 +134,7 @@ trait GpuHashJoin extends GpuExec with HashJoin { override def hasNext: Boolean = { while (nextCb.isEmpty && (first || stream.hasNext)) { if (stream.hasNext) { - val cb = if (shouldFilterStreamTableForNulls) { - filterStreamedTable(stream.next()) - } else { - stream.next() - } + val cb = stream.next() val startTime = System.nanoTime() nextCb = doJoin(builtTable, cb, boundCondition, joinOutputRows, numOutputRows, numOutputBatches, joinTime, filterTime) diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuShuffledHashJoinExec.scala b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuShuffledHashJoinExec.scala index d208d715d58..a01125e6f5e 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuShuffledHashJoinExec.scala +++ b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuShuffledHashJoinExec.scala @@ -129,20 +129,20 @@ case class GpuShuffledHashJoinExec( streamedPlan.executeColumnar().zipPartitions(buildPlan.executeColumnar()) { (streamIter, buildIter) => { var combinedSize = 0 - val startTime = System.nanoTime() - val builtTable = withResource(ConcatAndConsumeAll.getSingleBatchWithVerification( - buildIter, localBuildOutput)) { buildBatch: ColumnarBatch => - withResource(GpuProjectExec.project(buildBatch, gpuBuildKeys)) { keys => - val combined = GpuHashJoin.incRefCount(combine(keys, buildBatch)) - val filtered = filterBuiltTableIfNeeded(combined) - combinedSize = - GpuColumnVector.extractColumns(filtered) - .map(_.getBase.getDeviceMemorySize).sum.toInt - withResource(filtered) { filtered => - GpuColumnVector.from(filtered) - } - } + val buildBatch = + ConcatAndConsumeAll.getSingleBatchWithVerification(buildIter, localBuildOutput) + val keys = GpuProjectExec.project(buildBatch, gpuBuildKeys) + val builtTable = try { + // Combine does not inc any reference counting + val combined = combine(keys, buildBatch) + combinedSize = + GpuColumnVector.extractColumns(combined) + .map(_.getBase.getDeviceMemorySize).sum.toInt + GpuColumnVector.from(combined) + } finally { + keys.close() + buildBatch.close() } val delta = System.nanoTime() - startTime diff --git a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuBroadcastHashJoinExec.scala b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuBroadcastHashJoinExec.scala index a8307fe789c..a004e0fccb6 100644 --- a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuBroadcastHashJoinExec.scala +++ b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuBroadcastHashJoinExec.scala @@ -138,15 +138,10 @@ case class GpuBroadcastHashJoinExec( val boundCondition = condition.map(GpuBindReferences.bindReference(_, output)) lazy val builtTable = { - val ret = withResource( - GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys => - val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch)) - val filtered = filterBuiltTableIfNeeded(combined) - withResource(filtered) { filtered => - GpuColumnVector.from(filtered) - } - } - + // TODO clean up intermediate results... + val keys = GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys) + val combined = combine(keys, broadcastRelation.value.batch) + val ret = GpuColumnVector.from(combined) // Don't warn for a leak, because we cannot control when we are done with this (0 until ret.getNumberOfColumns).foreach(ret.getColumn(_).noWarnLeakExpected()) ret diff --git a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuHashJoin.scala b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuHashJoin.scala index 2e59d034e1a..b80db78fb28 100644 --- a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuHashJoin.scala +++ b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuHashJoin.scala @@ -24,7 +24,6 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, InnerLike, JoinType, LeftAnti, LeftExistence, LeftOuter, LeftSemi, RightOuter} import org.apache.spark.sql.execution.joins.HashJoin import org.apache.spark.sql.execution.metric.SQLMetric -import org.apache.spark.sql.rapids.GpuAnd import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} object GpuHashJoin { @@ -41,11 +40,6 @@ object GpuHashJoin { } case _ => meta.willNotWorkOnGpu(s"$joinType currently is not supported") } - - def incRefCount(cb: ColumnarBatch): ColumnarBatch = { - GpuColumnVector.extractBases(cb).foreach(_.incRefCount()) - cb - } } trait GpuHashJoin extends GpuExec with HashJoin { @@ -117,58 +111,6 @@ trait GpuHashJoin extends GpuExec with HashJoin { output.indices.map (v => v + joinLength) } - // Spark adds in rules to filter out nulls for some types of joins, but it does not - // guarantee 100% that all nulls will be filtered out by the time they get to - // this point, but because of https://github.com/rapidsai/cudf/issues/6052 - // we need to filter out the nulls ourselves until it is fixed. - // InnerLike | LeftSemi => - // filter left and right keys - // RightOuter => - // filter left keys - // LeftOuter | LeftAnti => - // filter right keys - - private[this] lazy val shouldFilterBuiltTableForNulls: Boolean = { - val builtAnyNullable = gpuBuildKeys.exists(_.nullable) - (joinType, buildSide) match { - case (_: InnerLike | LeftSemi, _) => builtAnyNullable - case (RightOuter, BuildLeft) => builtAnyNullable - case (LeftOuter | LeftAnti, BuildRight) => builtAnyNullable - case _ => false - } - } - - private[this] lazy val shouldFilterStreamTableForNulls: Boolean = { - val streamedAnyNullable = gpuStreamedKeys.exists(_.nullable) - (joinType, buildSide) match { - case (_: InnerLike | LeftSemi, _) => streamedAnyNullable - case (RightOuter, BuildRight) => streamedAnyNullable - case (LeftOuter | LeftAnti, BuildLeft) => streamedAnyNullable - case _ => false - } - } - - private[this] def mkNullFilterExpr(exprs: Seq[GpuExpression]): GpuExpression = - exprs.zipWithIndex.map { kv => - GpuIsNotNull(GpuBoundReference(kv._2, kv._1.dataType, kv._1.nullable)) - }.reduce(GpuAnd) - - private[this] lazy val builtTableNullFilterExpression: GpuExpression = - mkNullFilterExpr(gpuBuildKeys) - - private[this] lazy val streamedTableNullFilterExpression: GpuExpression = - mkNullFilterExpr(gpuStreamedKeys) - - def filterBuiltTableIfNeeded(builtBatch: ColumnarBatch): ColumnarBatch = - if (shouldFilterBuiltTableForNulls) { - GpuFilter(builtBatch, builtTableNullFilterExpression) - } else { - builtBatch - } - - private[this] def filterStreamedTable(streamedBatch:ColumnarBatch): ColumnarBatch = - GpuFilter(streamedBatch, streamedTableNullFilterExpression) - def doJoin(builtTable: Table, stream: Iterator[ColumnarBatch], boundCondition: Option[Expression], @@ -193,11 +135,7 @@ trait GpuHashJoin extends GpuExec with HashJoin { override def hasNext: Boolean = { while (nextCb.isEmpty && (first || stream.hasNext)) { if (stream.hasNext) { - val cb = if (shouldFilterStreamTableForNulls) { - filterStreamedTable(stream.next()) - } else { - stream.next() - } + val cb = stream.next() val startTime = System.nanoTime() nextCb = doJoin(builtTable, cb, boundCondition, joinOutputRows, numOutputRows, numOutputBatches, joinTime, filterTime) diff --git a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuShuffledHashJoinExec.scala b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuShuffledHashJoinExec.scala index 2aa4f83ad13..ad481219fc6 100644 --- a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuShuffledHashJoinExec.scala +++ b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/GpuShuffledHashJoinExec.scala @@ -118,20 +118,20 @@ case class GpuShuffledHashJoinExec( streamedPlan.executeColumnar().zipPartitions(buildPlan.executeColumnar()) { (streamIter, buildIter) => { var combinedSize = 0 - val startTime = System.nanoTime() - val builtTable = withResource(ConcatAndConsumeAll.getSingleBatchWithVerification( - buildIter, localBuildOutput)) { buildBatch: ColumnarBatch => - withResource(GpuProjectExec.project(buildBatch, gpuBuildKeys)) { keys => - val combined = GpuHashJoin.incRefCount(combine(keys, buildBatch)) - val filtered = filterBuiltTableIfNeeded(combined) - combinedSize = - GpuColumnVector.extractColumns(filtered) - .map(_.getBase.getDeviceMemorySize).sum.toInt - withResource(filtered) { filtered => - GpuColumnVector.from(filtered) - } - } + val buildBatch = + ConcatAndConsumeAll.getSingleBatchWithVerification(buildIter, localBuildOutput) + val keys = GpuProjectExec.project(buildBatch, gpuBuildKeys) + val builtTable = try { + // Combine does not inc any reference counting + val combined = combine(keys, buildBatch) + combinedSize = + GpuColumnVector.extractColumns(combined) + .map(_.getBase.getDeviceMemorySize).sum.toInt + GpuColumnVector.from(combined) + } finally { + keys.close() + buildBatch.close() } val delta = System.nanoTime() - startTime diff --git a/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/GpuBroadcastHashJoinExec.scala b/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/GpuBroadcastHashJoinExec.scala index 45d5c6e4303..ef46a37e832 100644 --- a/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/GpuBroadcastHashJoinExec.scala +++ b/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/GpuBroadcastHashJoinExec.scala @@ -143,15 +143,10 @@ case class GpuBroadcastHashJoinExec( val boundCondition = condition.map(GpuBindReferences.bindReference(_, output)) lazy val builtTable = { - val ret = withResource( - GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys => - val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch)) - val filtered = filterBuiltTableIfNeeded(combined) - withResource(filtered) { filtered => - GpuColumnVector.from(filtered) - } - } - + // TODO clean up intermediate results... + val keys = GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys) + val combined = combine(keys, broadcastRelation.value.batch) + val ret = GpuColumnVector.from(combined) // Don't warn for a leak, because we cannot control when we are done with this (0 until ret.getNumberOfColumns).foreach(ret.getColumn(_).noWarnLeakExpected()) ret diff --git a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuHashJoin.scala b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuHashJoin.scala index 4ed12d95727..27d10c5c0b3 100644 --- a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuHashJoin.scala +++ b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuHashJoin.scala @@ -24,7 +24,6 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, InnerLike, JoinType, LeftAnti, LeftExistence, LeftOuter, LeftSemi, RightOuter} import org.apache.spark.sql.execution.joins.HashJoinWithoutCodegen import org.apache.spark.sql.execution.metric.SQLMetric -import org.apache.spark.sql.rapids.GpuAnd import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} object GpuHashJoin { @@ -41,11 +40,6 @@ object GpuHashJoin { } case _ => meta.willNotWorkOnGpu(s"$joinType currently is not supported") } - - def incRefCount(cb: ColumnarBatch): ColumnarBatch = { - GpuColumnVector.extractBases(cb).foreach(_.incRefCount()) - cb - } } trait GpuHashJoin extends GpuExec with HashJoinWithoutCodegen { @@ -117,58 +111,6 @@ trait GpuHashJoin extends GpuExec with HashJoinWithoutCodegen { output.indices.map (v => v + joinLength) } - // Spark adds in rules to filter out nulls for some types of joins, but it does not - // guarantee 100% that all nulls will be filtered out by the time they get to - // this point, but because of https://github.com/rapidsai/cudf/issues/6052 - // we need to filter out the nulls ourselves until it is fixed. - // InnerLike | LeftSemi => - // filter left and right keys - // RightOuter => - // filter left keys - // LeftOuter | LeftAnti => - // filter right keys - - private[this] lazy val shouldFilterBuiltTableForNulls: Boolean = { - val builtAnyNullable = gpuBuildKeys.exists(_.nullable) - (joinType, buildSide) match { - case (_: InnerLike | LeftSemi, _) => builtAnyNullable - case (RightOuter, BuildLeft) => builtAnyNullable - case (LeftOuter | LeftAnti, BuildRight) => builtAnyNullable - case _ => false - } - } - - private[this] lazy val shouldFilterStreamTableForNulls: Boolean = { - val streamedAnyNullable = gpuStreamedKeys.exists(_.nullable) - (joinType, buildSide) match { - case (_: InnerLike | LeftSemi, _) => streamedAnyNullable - case (RightOuter, BuildRight) => streamedAnyNullable - case (LeftOuter | LeftAnti, BuildLeft) => streamedAnyNullable - case _ => false - } - } - - private[this] def mkNullFilterExpr(exprs: Seq[GpuExpression]): GpuExpression = - exprs.zipWithIndex.map { kv => - GpuIsNotNull(GpuBoundReference(kv._2, kv._1.dataType, kv._1.nullable)) - }.reduce(GpuAnd) - - private[this] lazy val builtTableNullFilterExpression: GpuExpression = - mkNullFilterExpr(gpuBuildKeys) - - private[this] lazy val streamedTableNullFilterExpression: GpuExpression = - mkNullFilterExpr(gpuStreamedKeys) - - def filterBuiltTableIfNeeded(builtBatch: ColumnarBatch): ColumnarBatch = - if (shouldFilterBuiltTableForNulls) { - GpuFilter(builtBatch, builtTableNullFilterExpression) - } else { - builtBatch - } - - private[this] def filterStreamedTable(streamedBatch:ColumnarBatch): ColumnarBatch = - GpuFilter(streamedBatch, streamedTableNullFilterExpression) - def doJoin(builtTable: Table, stream: Iterator[ColumnarBatch], boundCondition: Option[Expression], @@ -193,11 +135,7 @@ trait GpuHashJoin extends GpuExec with HashJoinWithoutCodegen { override def hasNext: Boolean = { while (nextCb.isEmpty && (first || stream.hasNext)) { if (stream.hasNext) { - val cb = if (shouldFilterStreamTableForNulls) { - filterStreamedTable(stream.next()) - } else { - stream.next() - } + val cb = stream.next() val startTime = System.nanoTime() nextCb = doJoin(builtTable, cb, boundCondition, joinOutputRows, numOutputRows, numOutputBatches, joinTime, filterTime) diff --git a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuShuffledHashJoinExec.scala b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuShuffledHashJoinExec.scala index 0fe80ab402a..1d16e38d073 100644 --- a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuShuffledHashJoinExec.scala +++ b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuShuffledHashJoinExec.scala @@ -130,20 +130,20 @@ case class GpuShuffledHashJoinExec( streamedPlan.executeColumnar().zipPartitions(buildPlan.executeColumnar()) { (streamIter, buildIter) => { var combinedSize = 0 - val startTime = System.nanoTime() - val builtTable = withResource(ConcatAndConsumeAll.getSingleBatchWithVerification( - buildIter, localBuildOutput)) { buildBatch: ColumnarBatch => - withResource(GpuProjectExec.project(buildBatch, gpuBuildKeys)) { keys => - val combined = GpuHashJoin.incRefCount(combine(keys, buildBatch)) - val filtered = filterBuiltTableIfNeeded(combined) - combinedSize = - GpuColumnVector.extractColumns(filtered) - .map(_.getBase.getDeviceMemorySize).sum.toInt - withResource(filtered) { filtered => - GpuColumnVector.from(filtered) - } - } + val buildBatch = + ConcatAndConsumeAll.getSingleBatchWithVerification(buildIter, localBuildOutput) + val keys = GpuProjectExec.project(buildBatch, gpuBuildKeys) + val builtTable = try { + // Combine does not inc any reference counting + val combined = combine(keys, buildBatch) + combinedSize = + GpuColumnVector.extractColumns(combined) + .map(_.getBase.getDeviceMemorySize).sum.toInt + GpuColumnVector.from(combined) + } finally { + keys.close() + buildBatch.close() } val delta = System.nanoTime() - startTime diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala index a783e905939..ae4c9f46614 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala @@ -94,34 +94,32 @@ case class GpuProjectExec(projectList: Seq[Expression], child: SparkPlan) /** * Run a filter on a batch. The batch will be consumed. */ -object GpuFilter extends Arm { +object GpuFilter { def apply( batch: ColumnarBatch, boundCondition: Expression, numOutputRows: SQLMetric, numOutputBatches: SQLMetric, filterTime: SQLMetric): ColumnarBatch = { - withResource(new NvtxWithMetrics("filter batch", NvtxColor.YELLOW, filterTime)) { _ => - val filteredBatch = GpuFilter(batch, boundCondition) + val nvtxRange = new NvtxWithMetrics("filter batch", NvtxColor.YELLOW, filterTime) + try { + var filterConditionCv: GpuColumnVector = null + var tbl: cudf.Table = null + var filtered: cudf.Table = null + val filteredBatch = try { + filterConditionCv = boundCondition.columnarEval(batch).asInstanceOf[GpuColumnVector] + tbl = GpuColumnVector.from(batch) + filtered = tbl.filter(filterConditionCv.getBase) + GpuColumnVector.from(filtered) + } finally { + Seq(filtered, tbl, filterConditionCv, batch).safeClose() + } + numOutputBatches += 1 numOutputRows += filteredBatch.numRows() filteredBatch - } - } - - def apply( - batch: ColumnarBatch, - boundCondition: Expression) : ColumnarBatch = { - var filterConditionCv: GpuColumnVector = null - var tbl: cudf.Table = null - var filtered: cudf.Table = null - try { - filterConditionCv = boundCondition.columnarEval(batch).asInstanceOf[GpuColumnVector] - tbl = GpuColumnVector.from(batch) - filtered = tbl.filter(filterConditionCv.getBase) - GpuColumnVector.from(filtered) } finally { - Seq(filtered, tbl, filterConditionCv, batch).safeClose() + nvtxRange.close() } } } From d206a40fdc66331c57aaf2b01c44bfb587cd0f54 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Fri, 11 Sep 2020 15:00:40 -0500 Subject: [PATCH 2/2] Fix tests to run Signed-off-by: Robert (Bobby) Evans --- integration_tests/src/main/python/conftest.py | 2 +- .../rapids/tests/tpcds/TpcdsLikeSpark.scala | 20 +++++++++---------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/integration_tests/src/main/python/conftest.py b/integration_tests/src/main/python/conftest.py index 93c6aab2d0b..588738fa6db 100644 --- a/integration_tests/src/main/python/conftest.py +++ b/integration_tests/src/main/python/conftest.py @@ -347,7 +347,7 @@ def setup(self, spark): } if not self.tpcds_format in formats: raise RuntimeError("{} is not a supported tpcds input type".format(self.tpcds_format)) - formats.get(self.tpcds_format)(jvm_session,self.tpcds_path) + formats.get(self.tpcds_format)(jvm_session, self.tpcds_path, True) def do_test_query(self, query): spark = get_spark_i_know_what_i_am_doing() diff --git a/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpcds/TpcdsLikeSpark.scala b/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpcds/TpcdsLikeSpark.scala index 2af7e983263..1ca11599281 100644 --- a/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpcds/TpcdsLikeSpark.scala +++ b/integration_tests/src/main/scala/com/nvidia/spark/rapids/tests/tpcds/TpcdsLikeSpark.scala @@ -35,19 +35,19 @@ case class Table( basePath + "/" + name + rest } - def readCSV(spark: SparkSession, basePath: String): DataFrame = + def readCSV(spark: SparkSession, basePath: String, appendDat: Boolean = true): DataFrame = spark.read.option("delimiter", "|") .schema(schema) - .csv(path(basePath)) + .csv(path(basePath, appendDat)) - def setupCSV(spark: SparkSession, basePath: String): Unit = - readCSV(spark, basePath).createOrReplaceTempView(name) + def setupCSV(spark: SparkSession, basePath: String, appendDat: Boolean = true): Unit = + readCSV(spark, basePath, appendDat).createOrReplaceTempView(name) def setupParquet(spark: SparkSession, basePath: String, appendDat: Boolean = true): Unit = spark.read.parquet(path(basePath, appendDat)).createOrReplaceTempView(name) - def setupOrc(spark: SparkSession, basePath: String): Unit = - spark.read.orc(path(basePath)).createOrReplaceTempView(name) + def setupOrc(spark: SparkSession, basePath: String, appendDat: Boolean = true): Unit = + spark.read.orc(path(basePath, appendDat)).createOrReplaceTempView(name) def setup( spark: SparkSession, @@ -137,16 +137,16 @@ object TpcdsLikeSpark { tables.foreach(_.csvToOrc(spark, baseInput, baseOutput, writePartitioning)) } - def setupAllCSV(spark: SparkSession, basePath: String): Unit = { - tables.foreach(_.setupCSV(spark, basePath)) + def setupAllCSV(spark: SparkSession, basePath: String, appendDat: Boolean = true): Unit = { + tables.foreach(_.setupCSV(spark, basePath, appendDat)) } def setupAllParquet(spark: SparkSession, basePath: String, appendDat: Boolean = true): Unit = { tables.foreach(_.setupParquet(spark, basePath, appendDat)) } - def setupAllOrc(spark: SparkSession, basePath: String): Unit = { - tables.foreach(_.setupOrc(spark, basePath)) + def setupAllOrc(spark: SparkSession, basePath: String, appendDat: Boolean = true): Unit = { + tables.foreach(_.setupOrc(spark, basePath, appendDat)) } def setupAll(