Skip to content

Commit

Permalink
Filter nulls for left semi and left anti join to work around cudf (NV…
Browse files Browse the repository at this point in the history
…IDIA#1664)

Signed-off-by: Alessandro Bellina <abellina@nvidia.com>
  • Loading branch information
abellina authored Feb 4, 2021
1 parent 38353d7 commit 9e806b4
Showing 1 changed file with 90 additions and 29 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
* Copyright (c) 2020-2021, NVIDIA CORPORATION. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,14 +16,13 @@
package org.apache.spark.sql.rapids.execution

import ai.rapids.cudf.{NvtxColor, Table}
import com.nvidia.spark.rapids.{CoalesceGoal, GpuBindReferences, GpuBoundReference, GpuBuildLeft, GpuBuildRight, GpuBuildSide, GpuColumnVector, GpuExec, GpuExpression, GpuFilter, GpuIsNotNull, GpuProjectExec, NvtxWithMetrics, RapidsMeta, RequireSingleBatch}
import com.nvidia.spark.rapids.{CoalesceGoal, GpuBindReferences, GpuBuildLeft, GpuBuildRight, GpuBuildSide, GpuColumnVector, GpuExec, GpuExpression, GpuFilter, GpuIsNotNull, GpuProjectExec, NvtxWithMetrics, RapidsMeta, RequireSingleBatch}

import org.apache.spark.TaskContext
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, InnerLike, JoinType, LeftAnti, LeftExistence, LeftOuter, LeftSemi, RightOuter}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.rapids.GpuAnd
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}

object GpuHashJoin {
Expand Down Expand Up @@ -232,16 +231,23 @@ trait GpuHashJoin extends GpuExec {
GpuColumnVector.from(cb)
}

val nvtxRange = new NvtxWithMetrics("hash join", NvtxColor.ORANGE, joinTime)
val joined = try {
buildSide match {
case GpuBuildLeft => doJoinLeftRight(builtTable, streamedTable)
case GpuBuildRight => doJoinLeftRight(streamedTable, builtTable)
val joined =
withResource(new NvtxWithMetrics("hash join", NvtxColor.ORANGE, joinTime)) { _ =>
// `doJoinLeftRight` closes the right table if the last argument (`closeRightTable`)
// is true, but never closes the left table.
buildSide match {
case GpuBuildLeft =>
// tell `doJoinLeftRight` it is ok to close the `streamedTable`, this can help
// in order to close temporary/intermediary data after a filter in some scenarios.
doJoinLeftRight(builtTable, streamedTable, true)
case GpuBuildRight =>
// tell `doJoinLeftRight` to not close `builtTable`, as it is owned by our caller,
// here we close the left table as that one is never closed by `doJoinLeftRight`.
withResource(streamedTable) { _ =>
doJoinLeftRight(streamedTable, builtTable, false)
}
}
}
} finally {
streamedTable.close()
nvtxRange.close()
}

numJoinOutputRows += joined.numRows()

Expand All @@ -262,23 +268,79 @@ trait GpuHashJoin extends GpuExec {
}
}

private[this] def doJoinLeftRight(leftTable: Table, rightTable: Table): ColumnarBatch = {
val joinedTable = joinType match {
case LeftOuter => leftTable.onColumns(joinKeyIndices: _*)
.leftJoin(rightTable.onColumns(joinKeyIndices: _*), false)
case RightOuter => rightTable.onColumns(joinKeyIndices: _*)
.leftJoin(leftTable.onColumns(joinKeyIndices: _*), false)
case _: InnerLike => leftTable.onColumns(joinKeyIndices: _*)
.innerJoin(rightTable.onColumns(joinKeyIndices: _*), false)
case LeftSemi => leftTable.onColumns(joinKeyIndices: _*)
.leftSemiJoin(rightTable.onColumns(joinKeyIndices: _*), false)
case LeftAnti => leftTable.onColumns(joinKeyIndices: _*)
.leftAntiJoin(rightTable.onColumns(joinKeyIndices: _*), false)
case FullOuter => leftTable.onColumns(joinKeyIndices: _*)
.fullJoin(rightTable.onColumns(joinKeyIndices: _*), false)
case _ => throw new NotImplementedError(s"Joint Type ${joinType.getClass} is not currently" +
s" supported")
// This is a work around added in response to https://github.com/NVIDIA/spark-rapids/issues/1643.
// to deal with slowness arising from many nulls in the build-side of the join. The work around
// should be removed when https://github.com/rapidsai/cudf/issues/7300 is addressed.
private[this] def filterNulls(table: Table, joinKeyIndices: Range, closeTable: Boolean): Table = {
var mask: ai.rapids.cudf.ColumnVector = null
try {
joinKeyIndices.indices.foreach { c =>
mask = withResource(table.getColumn(c).isNotNull) { nn =>
if (mask == null) {
nn.incRefCount()
} else {
withResource(mask) { _ =>
mask.and(nn)
}
}
}
}
table.filter(mask)
} finally {
if (mask != null) {
mask.close()
}

// in some cases, we cannot close the table since it was the build table and is
// reused.
if (closeTable) {
table.close()
}
}
}

private[this] def doJoinLeftRight(
leftTable: Table, rightTable: Table, closeRightTable: Boolean): ColumnarBatch = {

def withRightTable(body: Table => Table): Table = {
val builtAnyNullable =
(joinType == LeftSemi || joinType == LeftAnti) && gpuBuildKeys.exists(_.nullable)

if (builtAnyNullable) {
withResource(filterNulls(rightTable, joinKeyIndices, closeRightTable)) { filtered =>
body(filtered)
}
} else {
try {
body(rightTable)
} finally {
if (closeRightTable) {
rightTable.close()
}
}
}
}

val joinedTable = withRightTable { rt =>
joinType match {
case LeftOuter => leftTable.onColumns(joinKeyIndices: _*)
.leftJoin(rt.onColumns(joinKeyIndices: _*), false)
case RightOuter => rt.onColumns(joinKeyIndices: _*)
.leftJoin(leftTable.onColumns(joinKeyIndices: _*), false)
case _: InnerLike => leftTable.onColumns(joinKeyIndices: _*)
.innerJoin(rt.onColumns(joinKeyIndices: _*), false)
case LeftSemi => leftTable.onColumns(joinKeyIndices: _*)
.leftSemiJoin(rt.onColumns(joinKeyIndices: _*), false)
case LeftAnti => leftTable.onColumns(joinKeyIndices: _*)
.leftAntiJoin(rt.onColumns(joinKeyIndices: _*), false)
case FullOuter => leftTable.onColumns(joinKeyIndices: _*)
.fullJoin(rt.onColumns(joinKeyIndices: _*), false)
case _ =>
throw new NotImplementedError(s"Joint Type ${joinType.getClass} is not currently" +
s" supported")
}
}

try {
val result = joinIndices.zip(output).map { case (joinIndex, outAttr) =>
GpuColumnVector.from(joinedTable.getColumn(joinIndex).incRefCount(), outAttr.dataType)
Expand All @@ -289,5 +351,4 @@ trait GpuHashJoin extends GpuExec {
joinedTable.close()
}
}

}

0 comments on commit 9e806b4

Please sign in to comment.