diff --git a/integration_tests/src/main/python/join_test.py b/integration_tests/src/main/python/join_test.py index affde3b4d3c..dc29e63fa8a 100644 --- a/integration_tests/src/main/python/join_test.py +++ b/integration_tests/src/main/python/join_test.py @@ -98,7 +98,6 @@ def do_join(spark): # local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 @ignore_order(local=True) @pytest.mark.parametrize('data_gen', all_gen, ids=idfn) -@pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/323') def test_broadcast_nested_loop_join_special_case(data_gen): def do_join(spark): left, right = create_df(spark, data_gen, 50, 25) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuCartesianProductExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuCartesianProductExec.scala index 4c8c5405d0c..6c49013aa5c 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuCartesianProductExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuCartesianProductExec.scala @@ -178,6 +178,35 @@ class GpuCartesianRDD( } } +object GpuNoColumnCrossJoin { + def divideIntoBatches(rowCounts: RDD[Long], + targetSizeBytes: Long, + numOutputRows: SQLMetric, + numOutputBatches: SQLMetric): RDD[ColumnarBatch] = { + // Hash aggregate explodes the rows out, so if we go too large + // it can blow up. The size of a Long is 8 bytes so we just go with + // that as our estimate, no nulls. + val maxRowCount = targetSizeBytes/8 + + def divideIntoBatches(rows: Long): Iterable[ColumnarBatch] = { + val numBatches = (rows + maxRowCount - 1)/maxRowCount + (0L until numBatches).map(i => { + val ret = new ColumnarBatch(new Array[ColumnVector](0)) + if ((i + 1) * maxRowCount > rows) { + ret.setNumRows((rows - (i * maxRowCount)).toInt) + } else { + ret.setNumRows(maxRowCount.toInt) + } + numOutputRows += ret.numRows() + numOutputBatches += 1 + ret + }) + } + + rowCounts.flatMap(divideIntoBatches) + } +} + case class GpuCartesianProductExec( left: SparkPlan, right: SparkPlan, @@ -229,32 +258,16 @@ case class GpuCartesianProductExec( ret } - // Hash aggregate explodes the rows out, so if we go too large - // it can blow up. The size of a Long is 8 bytes so we just go with - // that as our estimate, no nulls. - val maxRowCount = targetSizeBytes/8 - - def divideIntoBatches(rows: Long): Iterable[ColumnarBatch] = { - val numBatches = (rows + maxRowCount - 1)/maxRowCount - (0L until numBatches).map(i => { - val ret = new ColumnarBatch(new Array[ColumnVector](0)) - if ((i + 1) * maxRowCount > rows) { - ret.setNumRows((rows - (i * maxRowCount)).toInt) - } else { - ret.setNumRows(maxRowCount.toInt) - } - numOutputRows += ret.numRows() - numOutputBatches += 1 - ret - }) - } val l = left.executeColumnar().map(getRowCountAndClose) val r = right.executeColumnar().map(getRowCountAndClose) // TODO here too it would probably be best to avoid doing any re-computation // that happens with the built in cartesian, but writing another custom RDD // just for this use case is not worth it without an explicit use case. - val prods = l.cartesian(r).map(p => p._1 * p._2) - prods.flatMap(divideIntoBatches) + GpuNoColumnCrossJoin.divideIntoBatches( + l.cartesian(r).map(p => p._1 * p._2), + targetSizeBytes, + numOutputRows, + numOutputBatches) } else { new GpuCartesianRDD(sparkContext, boundCondition, diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastExchangeExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastExchangeExec.scala index db3f35d24da..e0f0532ac1d 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastExchangeExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastExchangeExec.scala @@ -41,7 +41,7 @@ import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNes import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.rapids.execution -import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} @SerialVersionUID(100L) class SerializeConcatHostBuffersDeserializeBatch( @@ -75,6 +75,8 @@ class SerializeConcatHostBuffersDeserializeBatch( } finally { empty.close() } + } else if (headers.head.getNumColumns == 0) { + JCudfSerialization.writeRowsToStream(out, numRows) } else { JCudfSerialization.writeConcatedStream(headers, buffers, out) } @@ -83,12 +85,19 @@ class SerializeConcatHostBuffersDeserializeBatch( private def readObject(in: ObjectInputStream): Unit = { val range = new NvtxRange("DeserializeBatch", NvtxColor.PURPLE) try { - val tableInfo: JCudfSerialization.TableAndRowCountPair = JCudfSerialization.readTableFrom(in) + val tableInfo: JCudfSerialization.TableAndRowCountPair = + JCudfSerialization.readTableFrom(in) try { val table = tableInfo.getTable - // This is read as part of the broadcast join so we expect it to leak. - (0 until table.getNumberOfColumns).foreach(table.getColumn(_).noWarnLeakExpected()) - this.batchInternal = GpuColumnVector.from(table) + if (table == null) { + val numRows = tableInfo.getNumRows + this.batchInternal = new ColumnarBatch(new Array[ColumnVector](0)) + batchInternal.setNumRows(numRows.toInt) + } else { + // This is read as part of the broadcast join so we expect it to leak. + (0 until table.getNumberOfColumns).foreach(table.getColumn(_).noWarnLeakExpected()) + this.batchInternal = GpuColumnVector.from(table) + } } finally { tableInfo.close() } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala index c120444b4b9..361dd3c3fe4 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastNestedLoopJoinExec.scala @@ -17,9 +17,10 @@ package org.apache.spark.sql.rapids.execution import ai.rapids.cudf.{NvtxColor, Table} -import com.nvidia.spark.rapids.{Arm, BaseExprMeta, ConfKeysAndIncompat, GpuBindReferences, GpuColumnVector, GpuExec, GpuExpression, GpuFilter, GpuOverrides, NvtxWithMetrics, RapidsConf, RapidsMeta, SparkPlanMeta} +import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.GpuMetricNames.{NUM_OUTPUT_BATCHES, NUM_OUTPUT_ROWS, TOTAL_TIME} +import org.apache.spark.TaskContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} @@ -29,6 +30,7 @@ import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.execution.joins.{BroadcastNestedLoopJoinExec, BuildLeft, BuildRight, BuildSide} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.sql.rapids.GpuNoColumnCrossJoin import org.apache.spark.sql.vectorized.ColumnarBatch class GpuBroadcastNestedLoopJoinMeta( @@ -133,7 +135,7 @@ case class GpuBroadcastNestedLoopJoinExec( buildSide: BuildSide, joinType: JoinType, condition: Option[Expression], - targetSize: Long) extends BinaryExecNode with GpuExec { + targetSizeBytes: Long) extends BinaryExecNode with GpuExec { override protected def doExecute(): RDD[InternalRow] = throw new IllegalStateException("This should only be called from columnar") @@ -199,26 +201,50 @@ case class GpuBroadcastNestedLoopJoinExec( val broadcastedRelation = broadcastExchange.executeColumnarBroadcast[SerializeConcatHostBuffersDeserializeBatch]() - lazy val builtTable: Table = { - withResource(new NvtxWithMetrics("build join table", NvtxColor.GREEN, buildTime)) { _ => - val ret = GpuColumnVector.from(broadcastedRelation.value.batch) - // Don't warn for a leak, because we cannot control when we are done with this - (0 until ret.getNumberOfColumns).foreach( i => { - val column = ret.getColumn(i) - column.noWarnLeakExpected() - buildDataSize += column.getDeviceMemorySize - }) + if (output.isEmpty) { + assert(boundCondition.isEmpty) + + lazy val buildCount: Long = { + withResource(new NvtxWithMetrics("build join table", NvtxColor.GREEN, buildTime)) { _ => + broadcastedRelation.value.batch.numRows() + } + } + + def getRowCountAndClose(cb: ColumnarBatch): Long = { + val ret = cb.numRows() + cb.close() + GpuSemaphore.releaseIfNecessary(TaskContext.get()) ret } - } - streamed.executeColumnar().mapPartitions { streamedIter => - joinType match { - case _: InnerLike => GpuBroadcastNestedLoopJoinExec.innerLikeJoin(streamedIter, - builtTable, buildSide, boundCondition, - joinTime, joinOutputRows, numOutputRows, numOutputBatches, filterTime, totalTime) - case _ => throw new IllegalArgumentException(s"$joinType + $buildSide is not supported" + - s" and should be run on the CPU") + val counts = streamed.executeColumnar().map(getRowCountAndClose) + GpuNoColumnCrossJoin.divideIntoBatches( + counts.map(s => s * buildCount), + targetSizeBytes, + numOutputRows, + numOutputBatches) + } else { + lazy val builtTable: Table = { + withResource(new NvtxWithMetrics("build join table", NvtxColor.GREEN, buildTime)) { _ => + val ret = GpuColumnVector.from(broadcastedRelation.value.batch) + // Don't warn for a leak, because we cannot control when we are done with this + (0 until ret.getNumberOfColumns).foreach(i => { + val column = ret.getColumn(i) + column.noWarnLeakExpected() + buildDataSize += column.getDeviceMemorySize + }) + ret + } + } + + streamed.executeColumnar().mapPartitions { streamedIter => + joinType match { + case _: InnerLike => GpuBroadcastNestedLoopJoinExec.innerLikeJoin(streamedIter, + builtTable, buildSide, boundCondition, + joinTime, joinOutputRows, numOutputRows, numOutputBatches, filterTime, totalTime) + case _ => throw new IllegalArgumentException(s"$joinType + $buildSide is not supported" + + s" and should be run on the CPU") + } } } }