Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for LeftOuter/BuildRight and RightOuter/BuildLeft nested loop joins #3242

Merged
merged 4 commits into from
Aug 20, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ Name | Description | Default Value | Notes
<a name="sql.exec.BroadcastExchangeExec"></a>spark.rapids.sql.exec.BroadcastExchangeExec|The backend for broadcast exchange of data|true|None|
<a name="sql.exec.ShuffleExchangeExec"></a>spark.rapids.sql.exec.ShuffleExchangeExec|The backend for most data being exchanged between processes|true|None|
<a name="sql.exec.BroadcastHashJoinExec"></a>spark.rapids.sql.exec.BroadcastHashJoinExec|Implementation of join using broadcast data|true|None|
<a name="sql.exec.BroadcastNestedLoopJoinExec"></a>spark.rapids.sql.exec.BroadcastNestedLoopJoinExec|Implementation of join using brute force|true|None|
<a name="sql.exec.BroadcastNestedLoopJoinExec"></a>spark.rapids.sql.exec.BroadcastNestedLoopJoinExec|Implementation of join using brute force. Full outer joins and joins where the broadcast side matches the join side (e.g.: LeftOuter with left broadcast) are not supported. A non-inner join only is supported if the join condition expression can be converted to a GPU AST expression|true|None|
<a name="sql.exec.CartesianProductExec"></a>spark.rapids.sql.exec.CartesianProductExec|Implementation of join using brute force|true|None|
<a name="sql.exec.ShuffledHashJoinExec"></a>spark.rapids.sql.exec.ShuffledHashJoinExec|Implementation of join using hashed shuffled data|true|None|
<a name="sql.exec.SortMergeJoinExec"></a>spark.rapids.sql.exec.SortMergeJoinExec|Sort merge join, replacing with shuffled hash join|true|None|
Expand Down
2 changes: 1 addition & 1 deletion docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ Accelerator supports are described below.
</tr>
<tr>
<td>BroadcastNestedLoopJoinExec</td>
<td>Implementation of join using brute force</td>
<td>Implementation of join using brute force. Full outer joins and joins where the broadcast side matches the join side (e.g.: LeftOuter with left broadcast) are not supported. A non-inner join only is supported if the join condition expression can be converted to a GPU AST expression</td>
<td>None</td>
<td>S</td>
<td>S</td>
Expand Down
121 changes: 93 additions & 28 deletions integration_tests/src/main/python/join_test.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,311 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids

import scala.collection.mutable

import ai.rapids.cudf.{GatherMap, NvtxColor}
import com.nvidia.spark.rapids.RapidsBuffer.SpillCallback

import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.vectorized.ColumnarBatch

/**
* Base class for iterators producing the results of a join.
* @param gatherNvtxName name to use for the NVTX range when producing the join gather maps
* @param targetSize configured target batch size in bytes
* @param joinTime metric to record GPU time spent in join
* @param totalTime metric to record total time in the iterator
*/
abstract class AbstractGpuJoinIterator(
gatherNvtxName: String,
targetSize: Long,
joinTime: GpuMetric,
totalTime: GpuMetric) extends Iterator[ColumnarBatch] with Arm with AutoCloseable {
private[this] var nextCb: Option[ColumnarBatch] = None
private[this] var gathererStore: Option[JoinGatherer] = None

protected[this] var closed = false

TaskContext.get().addTaskCompletionListener[Unit](_ => close())

/** Returns whether there are any more batches on the stream side of the join */
protected def hasNextStreamBatch: Boolean

/**
* Called to setup the next join gatherer instance when the previous instance is done or
* there is no previous instance.
* @param startNanoTime system nanoseconds timestamp at the top of the iterator loop, useful for
* calculating the time spent producing the next stream batch
* @return some gatherer to use next or None if there is no next gatherer or the loop should try
* to build the gatherer again (e.g.: to skip a degenerate join result batch)
*/
protected def setupNextGatherer(startNanoTime: Long): Option[JoinGatherer]

override def hasNext: Boolean = {
if (closed) {
return false
}
var mayContinue = true
while (nextCb.isEmpty && mayContinue) {
val startNanoTime = System.nanoTime()
if (gathererStore.exists(!_.isDone)) {
nextCb = nextCbFromGatherer()
} else if (hasNextStreamBatch) {
// Need to refill the gatherer
gathererStore.foreach(_.close())
gathererStore = None
gathererStore = setupNextGatherer(startNanoTime)
nextCb = nextCbFromGatherer()
} else {
mayContinue = false
}
totalTime += (System.nanoTime() - startNanoTime)
}
if (nextCb.isEmpty) {
// Nothing is left to return so close ASAP.
close()
}
nextCb.isDefined
}

override def next(): ColumnarBatch = {
if (!hasNext) {
throw new NoSuchElementException()
}
val ret = nextCb.get
nextCb = None
ret
}

override def close(): Unit = {
if (!closed) {
nextCb.foreach(_.close())
nextCb = None
gathererStore.foreach(_.close())
gathererStore = None
closed = true
}
}

private def nextCbFromGatherer(): Option[ColumnarBatch] = {
withResource(new NvtxWithMetrics(gatherNvtxName, NvtxColor.DARK_GREEN, joinTime)) { _ =>
val ret = gathererStore.map { gather =>
val nextRows = JoinGatherer.getRowsInNextBatch(gather, targetSize)
gather.gatherNext(nextRows)
}
if (gathererStore.exists(_.isDone)) {
gathererStore.foreach(_.close())
gathererStore = None
}

if (ret.isDefined) {
// We are about to return something. We got everything we need from it so now let it spill
// if there is more to be gathered later on.
gathererStore.foreach(_.allowSpilling())
}
ret
}
}
}

/**
* Base class for join iterators that split and spill batches to avoid GPU OOM errors.
* @param gatherNvtxName name to use for the NVTX range when producing the join gather maps
* @param stream iterator to produce the batches for the streaming side input of the join
* @param streamAttributes attributes corresponding to the streaming side input
* @param builtBatch batch for the built side input of the join
* @param targetSize configured target batch size in bytes
* @param spillCallback callback to use when spilling
* @param joinTime metric to record GPU time spent in join
* @param streamTime metric to record time spent producing streaming side batches
* @param totalTime metric to record total time in the iterator
*/
abstract class SplittableJoinIterator(
gatherNvtxName: String,
stream: Iterator[LazySpillableColumnarBatch],
streamAttributes: Seq[Attribute],
builtBatch: LazySpillableColumnarBatch,
targetSize: Long,
spillCallback: SpillCallback,
joinTime: GpuMetric,
streamTime: GpuMetric,
totalTime: GpuMetric)
extends AbstractGpuJoinIterator(
gatherNvtxName,
targetSize,
joinTime = joinTime,
totalTime = totalTime) with Logging {
// For some join types even if there is no stream data we might output something
private var isInitialJoin = true
// If the join explodes this holds batches from the stream side split into smaller pieces.
private val pendingSplits = mutable.Queue[SpillableColumnarBatch]()

protected def computeNumJoinRows(cb: ColumnarBatch): Long

/**
* Create a join gatherer.
* @param cb next column batch from the streaming side of the join
* @param numJoinRows if present, the number of join output rows computed for this batch
* @return some gatherer to use next or None if there is no next gatherer or the loop should try
* to build the gatherer again (e.g.: to skip a degenerate join result batch)
*/
protected def createGatherer(cb: ColumnarBatch, numJoinRows: Option[Long]): Option[JoinGatherer]

override def hasNextStreamBatch: Boolean = {
isInitialJoin || pendingSplits.nonEmpty || stream.hasNext
}

override def setupNextGatherer(startNanoTime: Long): Option[JoinGatherer] = {
val wasInitialJoin = isInitialJoin
isInitialJoin = false
if (pendingSplits.nonEmpty || stream.hasNext) {
val cb = if (pendingSplits.nonEmpty) {
withResource(pendingSplits.dequeue()) {
_.getColumnarBatch()
}
} else {
val batch = withResource(stream.next()) { lazyBatch =>
// TODO: Worth having a releaseBatch method for LazySpillableBatch?
jlowe marked this conversation as resolved.
Show resolved Hide resolved
GpuColumnVector.incRefCounts(lazyBatch.getBatch)
}
streamTime += (System.nanoTime() - startNanoTime)
batch
}
withResource(cb) { cb =>
val numJoinRows = computeNumJoinRows(cb)

// We want the gather maps size to be around the target size. There are two gather maps
// that are made up of ints, so compute how many rows on the stream side will produce the
// desired gather maps size.
val maxJoinRows = Math.max(1, targetSize / (2 * Integer.BYTES))
if (numJoinRows > maxJoinRows && cb.numRows() > 1) {
// Need to split the batch to reduce the gather maps size. This takes a simplistic
// approach of assuming the data is uniformly distributed in the stream table.
val numSplits = Math.min(cb.numRows(),
Math.ceil(numJoinRows.toDouble / maxJoinRows).toInt)
splitAndSave(cb, numSplits)

// Return no gatherer so the outer loop will try again
return None
}

createGatherer(cb, Some(numJoinRows))
}
} else {
assert(wasInitialJoin)
import scala.collection.JavaConverters._
withResource(GpuColumnVector.emptyBatch(streamAttributes.asJava)) { cb =>
createGatherer(cb, None)
}
}
}

override def close(): Unit = {
if (!closed) {
super.close()
builtBatch.close()
pendingSplits.foreach(_.close())
pendingSplits.clear()
}
}

/**
* Split a stream-side input batch, making all splits spillable, and replacing this batch with
* the splits in the stream-side input
* @param cb stream-side input batch to split
* @param numBatches number of splits to produce with approximately the same number of rows each
* @param oom a prior OOM exception that this will try to recover from by splitting
*/
protected def splitAndSave(
cb: ColumnarBatch,
numBatches: Int,
oom: Option[OutOfMemoryError] = None): Unit = {
val batchSize = cb.numRows() / numBatches
if (oom.isDefined && batchSize < 100) {
// We just need some kind of cutoff to not get stuck in a loop if the batches get to be too
// small but we want to at least give it a chance to work (mostly for tests where the
// targetSize can be set really small)
throw oom.get
}
val msg = s"Split stream batch into $numBatches batches of about $batchSize rows"
if (oom.isDefined) {
logWarning(s"OOM Encountered: $msg")
} else {
logInfo(msg)
}
val splits = withResource(GpuColumnVector.from(cb)) { tab =>
val splitIndexes = (1 until numBatches).map(num => num * batchSize)
tab.contiguousSplit(splitIndexes: _*)
}
withResource(splits) { splits =>
val schema = GpuColumnVector.extractTypes(cb)
pendingSplits ++= splits.map { ct =>
SpillableColumnarBatch(ct, schema,
SpillPriorities.ACTIVE_ON_DECK_PRIORITY, spillCallback)
}
}
}

/**
* Create a join gatherer from gather maps.
* @param maps gather maps produced from a cudf join
* @param leftData batch corresponding to the left table in the join
* @param rightData batch corresponding to the right table in the join
* @return some gatherer or None if the are no rows to gather in this join batch
*/
protected def makeGatherer(
maps: Array[GatherMap],
leftData: LazySpillableColumnarBatch,
rightData: LazySpillableColumnarBatch): Option[JoinGatherer] = {
assert(maps.length > 0 && maps.length <= 2)
try {
val leftMap = maps.head
val rightMap = if (maps.length > 1) {
if (rightData.numCols == 0) {
// No data so don't bother with it
None
} else {
Some(maps(1))
}
} else {
None
}

val lazyLeftMap = LazySpillableGatherMap(leftMap, spillCallback, "left_map")
val gatherer = rightMap match {
case None =>
rightData.close()
JoinGatherer(lazyLeftMap, leftData)
case Some(right) =>
val lazyRightMap = LazySpillableGatherMap(right, spillCallback, "right_map")
JoinGatherer(lazyLeftMap, leftData, lazyRightMap, rightData)
}
if (gatherer.isDone) {
// Nothing matched...
gatherer.close()
None
} else {
Some(gatherer)
}
} finally {
maps.foreach(_.close())
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3075,7 +3075,10 @@ object GpuOverrides {
TypeSig.DECIMAL_64 + TypeSig.STRUCT), TypeSig.all),
(exchange, conf, p, r) => new GpuBroadcastMeta(exchange, conf, p, r)),
exec[BroadcastNestedLoopJoinExec](
"Implementation of join using brute force",
"Implementation of join using brute force. Full outer joins and joins where the " +
"broadcast side matches the join side (e.g.: LeftOuter with left broadcast) are not " +
"supported. A non-inner join only is supported if the join condition expression can " +
"be converted to a GPU AST expression",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_64 +
TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_64),
TypeSig.all),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
import scala.collection.mutable

import ai.rapids.cudf.{JCudfSerialization, NvtxColor, NvtxRange}
import ai.rapids.cudf.ast
import com.nvidia.spark.rapids.{Arm, GpuBindReferences, GpuBuildLeft, GpuColumnVector, GpuExec, GpuExpression, GpuMetric, GpuSemaphore, LazySpillableColumnarBatch, MetricsLevel}
import com.nvidia.spark.rapids.RapidsBuffer.SpillCallback
import com.nvidia.spark.rapids.RapidsPluginImplicits._
Expand Down Expand Up @@ -115,6 +114,7 @@ class GpuCartesianRDD(
sc: SparkContext,
boundCondition: Option[GpuExpression],
numFirstTableColumns: Int,
streamAttributes: Seq[Attribute],
spillCallback: SpillCallback,
targetSize: Long,
joinTime: GpuMetric,
Expand Down Expand Up @@ -186,9 +186,13 @@ class GpuCartesianRDD(
}

GpuBroadcastNestedLoopJoinExecBase.nestedLoopJoin(
Cross, numFirstTableColumns, batch, streamIterator, targetSize, GpuBuildLeft,
boundCondition, spillCallback, numOutputRows, joinOutputRows, numOutputBatches,
joinTime, totalTime)
Cross, GpuBuildLeft, numFirstTableColumns, batch, streamIterator, streamAttributes,
targetSize, boundCondition, spillCallback,
numOutputRows = numOutputRows,
joinOutputRows = joinOutputRows,
numOutputBatches = numOutputBatches,
joinTime = joinTime,
totalTime = totalTime)
}
}

Expand Down Expand Up @@ -273,6 +277,7 @@ case class GpuCartesianProductExec(
new GpuCartesianRDD(sparkContext,
boundCondition,
numFirstTableColumns,
right.output,
spillCallback,
targetSizeBytes,
joinTime,
Expand Down
Loading