Skip to content

Commit

Permalink
Remove extra null join fintering because cudf is fast for this now. (N…
Browse files Browse the repository at this point in the history
…VIDIA#1391)

Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
  • Loading branch information
revans2 authored Dec 15, 2020
1 parent 4aa7ad6 commit e0e3661
Show file tree
Hide file tree
Showing 8 changed files with 26 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ class GpuBroadcastHashJoinMeta(
GpuBroadcastHashJoinExec(
leftKeys.map(_.convertToGpu()),
rightKeys.map(_.convertToGpu()),
join.joinType, GpuJoinUtils.getGpuBuildSide(join.buildSide),
join.joinType,
GpuJoinUtils.getGpuBuildSide(join.buildSide),
condition.map(_.convertToGpu()),
left, right)
}
Expand Down Expand Up @@ -145,9 +146,8 @@ case class GpuBroadcastHashJoinExec(
val ret = withResource(
GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys =>
val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch))
val filtered = filterBuiltTableIfNeeded(combined)
withResource(filtered) { filtered =>
GpuColumnVector.from(filtered)
withResource(combined) { combined =>
GpuColumnVector.from(combined)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,11 @@ case class GpuShuffledHashJoinExec(
buildIter, localBuildOutput)) { buildBatch: ColumnarBatch =>
withResource(GpuProjectExec.project(buildBatch, gpuBuildKeys)) { keys =>
val combined = GpuHashJoin.incRefCount(combine(keys, buildBatch))
withResource(filterBuiltTableIfNeeded(combined)) { filtered =>
withResource(combined) { combined =>
combinedSize =
GpuColumnVector.extractColumns(filtered)
GpuColumnVector.extractColumns(combined)
.map(_.getBase.getDeviceMemorySize).sum.toInt
GpuColumnVector.from(filtered)
GpuColumnVector.from(combined)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ class GpuBroadcastHashJoinMeta(
GpuHashJoin.tagJoin(this, join.joinType, join.leftKeys, join.rightKeys, join.condition)

val buildSide = join.buildSide match {
case BuildLeft => childPlans(0)
case BuildRight => childPlans(1)
case BuildLeft => childPlans(0)
case BuildRight => childPlans(1)
}

if (!canBuildSideBeReplaced(buildSide)) {
Expand Down Expand Up @@ -123,8 +123,7 @@ case class GpuBroadcastHashJoinExec(
}

override def doExecute(): RDD[InternalRow] =
throw new IllegalStateException(
"GpuBroadcastHashJoin does not support row-based processing")
throw new IllegalStateException("GpuBroadcastHashJoin does not support row-based processing")

override def doExecuteColumnar() : RDD[ColumnarBatch] = {
val numOutputRows = longMetric(NUM_OUTPUT_ROWS)
Expand All @@ -144,9 +143,8 @@ case class GpuBroadcastHashJoinExec(
val ret = withResource(
GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys =>
val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch))
val filtered = filterBuiltTableIfNeeded(combined)
withResource(filtered) { filtered =>
GpuColumnVector.from(filtered)
withResource(combined) { combined =>
GpuColumnVector.from(combined)
}
}

Expand All @@ -161,4 +159,3 @@ case class GpuBroadcastHashJoinExec(
numOutputBatches, streamTime, joinTime, filterTime, totalTime))
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ class GpuBroadcastHashJoinMeta(
GpuHashJoin.tagJoin(this, join.joinType, join.leftKeys, join.rightKeys, join.condition)

val buildSide = join.buildSide match {
case BuildLeft => childPlans(0)
case BuildRight => childPlans(1)
case BuildLeft => childPlans(0)
case BuildRight => childPlans(1)
}

if (!canBuildSideBeReplaced(buildSide)) {
Expand Down Expand Up @@ -123,8 +123,7 @@ case class GpuBroadcastHashJoinExec(
}

override def doExecute(): RDD[InternalRow] =
throw new IllegalStateException(
"GpuBroadcastHashJoin does not support row-based processing")
throw new IllegalStateException("GpuBroadcastHashJoin does not support row-based processing")

override def doExecuteColumnar() : RDD[ColumnarBatch] = {
val numOutputRows = longMetric(NUM_OUTPUT_ROWS)
Expand All @@ -144,9 +143,8 @@ case class GpuBroadcastHashJoinExec(
val ret = withResource(
GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys =>
val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch))
val filtered = filterBuiltTableIfNeeded(combined)
withResource(filtered) { filtered =>
GpuColumnVector.from(filtered)
withResource(combined) { combined =>
GpuColumnVector.from(combined)
}
}

Expand All @@ -161,4 +159,3 @@ case class GpuBroadcastHashJoinExec(
numOutputBatches, streamTime, joinTime, filterTime, totalTime))
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,11 @@ case class GpuShuffledHashJoinExec(
buildIter, localBuildOutput)) { buildBatch: ColumnarBatch =>
withResource(GpuProjectExec.project(buildBatch, gpuBuildKeys)) { keys =>
val combined = GpuHashJoin.incRefCount(combine(keys, buildBatch))
withResource(filterBuiltTableIfNeeded(combined)) { filtered =>
withResource(combined) { combined =>
combinedSize =
GpuColumnVector.extractColumns(filtered)
GpuColumnVector.extractColumns(combined)
.map(_.getBase.getDeviceMemorySize).sum.toInt
GpuColumnVector.from(filtered)
GpuColumnVector.from(combined)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ class GpuBroadcastHashJoinMeta(
if (!canThisBeReplaced) {
buildSide.willNotWorkOnGpu("the BroadcastHashJoin this feeds is not on the GPU")
}

}

override def convertToGpu(): GpuExec = {
Expand Down Expand Up @@ -149,9 +148,8 @@ case class GpuBroadcastHashJoinExec(
val ret = withResource(
GpuProjectExec.project(broadcastRelation.value.batch, gpuBuildKeys)) { keys =>
val combined = GpuHashJoin.incRefCount(combine(keys, broadcastRelation.value.batch))
val filtered = filterBuiltTableIfNeeded(combined)
withResource(filtered) { filtered =>
GpuColumnVector.from(filtered)
withResource(combined) { combined =>
GpuColumnVector.from(combined)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,11 @@ case class GpuShuffledHashJoinExec(
buildIter, localBuildOutput)) { buildBatch: ColumnarBatch =>
withResource(GpuProjectExec.project(buildBatch, gpuBuildKeys)) { keys =>
val combined = GpuHashJoin.incRefCount(combine(keys, buildBatch))
withResource(filterBuiltTableIfNeeded(combined)) { filtered =>
withResource(combined) { combined =>
combinedSize =
GpuColumnVector.extractColumns(filtered)
GpuColumnVector.extractColumns(combined)
.map(_.getBase.getDeviceMemorySize).sum.toInt
GpuColumnVector.from(filtered)
GpuColumnVector.from(combined)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,65 +155,6 @@ trait GpuHashJoin extends GpuExec {
output.indices.map (v => v + joinLength)
}

// Spark adds in rules to filter out nulls for some types of joins, but it does not
// guarantee 100% that all nulls will be filtered out by the time they get to
// this point, but because of https://github.com/rapidsai/cudf/issues/6052
// we need to filter out the nulls ourselves until it is fixed.
// InnerLike | LeftSemi =>
// filter left and right keys
// RightOuter =>
// filter left keys
// LeftOuter | LeftAnti =>
// filter right keys

private[this] lazy val shouldFilterBuiltTableForNulls: Boolean = {
val builtAnyNullable = gpuBuildKeys.exists(_.nullable)
(joinType, buildSide) match {
case (_: InnerLike | LeftSemi, _) => builtAnyNullable
case (RightOuter, GpuBuildLeft) => builtAnyNullable
case (LeftOuter | LeftAnti, GpuBuildRight) => builtAnyNullable
case _ => false
}
}

private[this] lazy val shouldFilterStreamTableForNulls: Boolean = {
val streamedAnyNullable = gpuStreamedKeys.exists(_.nullable)
(joinType, buildSide) match {
case (_: InnerLike | LeftSemi, _) => streamedAnyNullable
case (RightOuter, GpuBuildRight) => streamedAnyNullable
case (LeftOuter | LeftAnti, GpuBuildLeft) => streamedAnyNullable
case _ => false
}
}

private[this] def mkNullFilterExpr(exprs: Seq[GpuExpression]): GpuExpression =
exprs.zipWithIndex.map { kv =>
GpuIsNotNull(GpuBoundReference(kv._2, kv._1.dataType, kv._1.nullable))
}.reduce(GpuAnd)

private[this] lazy val builtTableNullFilterExpression: GpuExpression =
mkNullFilterExpr(gpuBuildKeys)

private[this] lazy val streamedTableNullFilterExpression: GpuExpression =
mkNullFilterExpr(gpuStreamedKeys)

/**
* Filter the builtBatch if needed. builtBatch will be closed.
*/
def filterBuiltTableIfNeeded(builtBatch: ColumnarBatch): ColumnarBatch =
if (shouldFilterBuiltTableForNulls) {
GpuFilter(builtBatch, builtTableNullFilterExpression)
} else {
builtBatch
}

private[this] def filterStreamedTableIfNeeded(streamedBatch:ColumnarBatch): ColumnarBatch =
if (shouldFilterStreamTableForNulls) {
GpuFilter(streamedBatch, streamedTableNullFilterExpression)
} else {
streamedBatch
}

def doJoin(builtTable: Table,
stream: Iterator[ColumnarBatch],
boundCondition: Option[Expression],
Expand Down Expand Up @@ -287,7 +228,7 @@ trait GpuHashJoin extends GpuExec {
GpuHashJoin.incRefCount(combine(streamedKeysBatch, streamedBatch))
}
}
val streamedTable = withResource(filterStreamedTableIfNeeded(combined)) { cb =>
val streamedTable = withResource(combined) { cb =>
GpuColumnVector.from(cb)
}

Expand Down

0 comments on commit e0e3661

Please sign in to comment.