Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filter nulls for left semi and left anti join to work around cudf #1664

Merged
merged 2 commits into from
Feb 4, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@
package org.apache.spark.sql.rapids.execution
jlowe marked this conversation as resolved.
Show resolved Hide resolved

import ai.rapids.cudf.{NvtxColor, Table}
import com.nvidia.spark.rapids.{CoalesceGoal, GpuBindReferences, GpuBoundReference, GpuBuildLeft, GpuBuildRight, GpuBuildSide, GpuColumnVector, GpuExec, GpuExpression, GpuFilter, GpuIsNotNull, GpuProjectExec, NvtxWithMetrics, RapidsMeta, RequireSingleBatch}
import com.nvidia.spark.rapids.{CoalesceGoal, GpuBindReferences, GpuBuildLeft, GpuBuildRight, GpuBuildSide, GpuColumnVector, GpuExec, GpuExpression, GpuFilter, GpuIsNotNull, GpuProjectExec, NvtxWithMetrics, RapidsMeta, RequireSingleBatch}

import org.apache.spark.TaskContext
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, InnerLike, JoinType, LeftAnti, LeftExistence, LeftOuter, LeftSemi, RightOuter}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.rapids.GpuAnd
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}

object GpuHashJoin {
Expand Down Expand Up @@ -232,16 +231,23 @@ trait GpuHashJoin extends GpuExec {
GpuColumnVector.from(cb)
}

val nvtxRange = new NvtxWithMetrics("hash join", NvtxColor.ORANGE, joinTime)
val joined = try {
buildSide match {
case GpuBuildLeft => doJoinLeftRight(builtTable, streamedTable)
case GpuBuildRight => doJoinLeftRight(streamedTable, builtTable)
val joined =
withResource(new NvtxWithMetrics("hash join", NvtxColor.ORANGE, joinTime)) { _ =>
// `doJoinLeftRight` closes the right table if the last argument (`closeRightTable`)
// is true, but never closes the left table.
buildSide match {
case GpuBuildLeft =>
// tell `doJoinLeftRight` it is ok to close the `streamedTable`, this can help
// in order to close temporary/intermediary data after a filter in some scenarios.
doJoinLeftRight(builtTable, streamedTable, true)
jlowe marked this conversation as resolved.
Show resolved Hide resolved
case GpuBuildRight =>
// tell `doJoinLeftRight` to not close `builtTable`, as it is owned by our caller,
// here we close the left table as that one is never closed by `doJoinLeftRight`.
withResource(streamedTable) { _ =>
doJoinLeftRight(streamedTable, builtTable, false)
}
}
}
} finally {
streamedTable.close()
nvtxRange.close()
}

numJoinOutputRows += joined.numRows()

Expand All @@ -262,23 +268,76 @@ trait GpuHashJoin extends GpuExec {
}
}

private[this] def doJoinLeftRight(leftTable: Table, rightTable: Table): ColumnarBatch = {
val joinedTable = joinType match {
case LeftOuter => leftTable.onColumns(joinKeyIndices: _*)
.leftJoin(rightTable.onColumns(joinKeyIndices: _*), false)
case RightOuter => rightTable.onColumns(joinKeyIndices: _*)
.leftJoin(leftTable.onColumns(joinKeyIndices: _*), false)
case _: InnerLike => leftTable.onColumns(joinKeyIndices: _*)
.innerJoin(rightTable.onColumns(joinKeyIndices: _*), false)
case LeftSemi => leftTable.onColumns(joinKeyIndices: _*)
.leftSemiJoin(rightTable.onColumns(joinKeyIndices: _*), false)
case LeftAnti => leftTable.onColumns(joinKeyIndices: _*)
.leftAntiJoin(rightTable.onColumns(joinKeyIndices: _*), false)
case FullOuter => leftTable.onColumns(joinKeyIndices: _*)
.fullJoin(rightTable.onColumns(joinKeyIndices: _*), false)
case _ => throw new NotImplementedError(s"Joint Type ${joinType.getClass} is not currently" +
s" supported")
private[this] def filterNulls(table: Table, joinKeyIndices: Range, closeTable: Boolean): Table = {
jlowe marked this conversation as resolved.
Show resolved Hide resolved
var mask: ai.rapids.cudf.ColumnVector = null
try {
joinKeyIndices.indices.foreach { c =>
mask = withResource(table.getColumn(c).isNotNull) { nn =>
if (mask == null) {
nn.incRefCount()
} else {
withResource(mask) { _ =>
mask.and(nn)
jlowe marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
}
table.filter(mask)
} finally {
if (mask != null) {
mask.close()
}

// in some cases, we cannot close the table since it was the build table and is
// reused.
if (closeTable) {
table.close()
}
}
}

private[this] def doJoinLeftRight(
leftTable: Table, rightTable: Table, closeRightTable: Boolean): ColumnarBatch = {

def withRightTable(body: Table => Table): Table = {
val builtAnyNullable =
(joinType == LeftSemi || joinType == LeftAnti) && gpuBuildKeys.exists(_.nullable)

if (builtAnyNullable) {
withResource(filterNulls(rightTable, joinKeyIndices, closeRightTable)) { filtered =>
body(filtered)
}
} else {
try {
body(rightTable)
} finally {
if (closeRightTable) {
rightTable.close()
}
}
}
}

val joinedTable = withRightTable { rt =>
joinType match {
case LeftOuter => leftTable.onColumns(joinKeyIndices: _*)
.leftJoin(rt.onColumns(joinKeyIndices: _*), false)
case RightOuter => rt.onColumns(joinKeyIndices: _*)
.leftJoin(leftTable.onColumns(joinKeyIndices: _*), false)
case _: InnerLike => leftTable.onColumns(joinKeyIndices: _*)
.innerJoin(rt.onColumns(joinKeyIndices: _*), false)
case LeftSemi => leftTable.onColumns(joinKeyIndices: _*)
.leftSemiJoin(rt.onColumns(joinKeyIndices: _*), false)
case LeftAnti => leftTable.onColumns(joinKeyIndices: _*)
.leftAntiJoin(rt.onColumns(joinKeyIndices: _*), false)
case FullOuter => leftTable.onColumns(joinKeyIndices: _*)
.fullJoin(rt.onColumns(joinKeyIndices: _*), false)
case _ =>
throw new NotImplementedError(s"Joint Type ${joinType.getClass} is not currently" +
s" supported")
}
}

try {
val result = joinIndices.zip(output).map { case (joinIndex, outAttr) =>
GpuColumnVector.from(joinedTable.getColumn(joinIndex).incRefCount(), outAttr.dataType)
Expand All @@ -289,5 +348,4 @@ trait GpuHashJoin extends GpuExec {
joinedTable.close()
}
}

}