diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala index b92e086f3a0..69395063083 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2020-2021, NVIDIA CORPORATION. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,14 +16,13 @@ package org.apache.spark.sql.rapids.execution import ai.rapids.cudf.{NvtxColor, Table} -import com.nvidia.spark.rapids.{CoalesceGoal, GpuBindReferences, GpuBoundReference, GpuBuildLeft, GpuBuildRight, GpuBuildSide, GpuColumnVector, GpuExec, GpuExpression, GpuFilter, GpuIsNotNull, GpuProjectExec, NvtxWithMetrics, RapidsMeta, RequireSingleBatch} +import com.nvidia.spark.rapids.{CoalesceGoal, GpuBindReferences, GpuBuildLeft, GpuBuildRight, GpuBuildSide, GpuColumnVector, GpuExec, GpuExpression, GpuFilter, GpuIsNotNull, GpuProjectExec, NvtxWithMetrics, RapidsMeta, RequireSingleBatch} import org.apache.spark.TaskContext import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, InnerLike, JoinType, LeftAnti, LeftExistence, LeftOuter, LeftSemi, RightOuter} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.metric.SQLMetric -import org.apache.spark.sql.rapids.GpuAnd import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} object GpuHashJoin { @@ -232,16 +231,23 @@ trait GpuHashJoin extends GpuExec { GpuColumnVector.from(cb) } - val nvtxRange = new NvtxWithMetrics("hash join", NvtxColor.ORANGE, joinTime) - val joined = try { - buildSide match { - case GpuBuildLeft => doJoinLeftRight(builtTable, streamedTable) - case GpuBuildRight => doJoinLeftRight(streamedTable, builtTable) + val joined = + withResource(new NvtxWithMetrics("hash join", NvtxColor.ORANGE, joinTime)) { _ => + // `doJoinLeftRight` closes the right table if the last argument (`closeRightTable`) + // is true, but never closes the left table. + buildSide match { + case GpuBuildLeft => + // tell `doJoinLeftRight` it is ok to close the `streamedTable`, this can help + // in order to close temporary/intermediary data after a filter in some scenarios. + doJoinLeftRight(builtTable, streamedTable, true) + case GpuBuildRight => + // tell `doJoinLeftRight` to not close `builtTable`, as it is owned by our caller, + // here we close the left table as that one is never closed by `doJoinLeftRight`. + withResource(streamedTable) { _ => + doJoinLeftRight(streamedTable, builtTable, false) + } + } } - } finally { - streamedTable.close() - nvtxRange.close() - } numJoinOutputRows += joined.numRows() @@ -262,23 +268,79 @@ trait GpuHashJoin extends GpuExec { } } - private[this] def doJoinLeftRight(leftTable: Table, rightTable: Table): ColumnarBatch = { - val joinedTable = joinType match { - case LeftOuter => leftTable.onColumns(joinKeyIndices: _*) - .leftJoin(rightTable.onColumns(joinKeyIndices: _*), false) - case RightOuter => rightTable.onColumns(joinKeyIndices: _*) - .leftJoin(leftTable.onColumns(joinKeyIndices: _*), false) - case _: InnerLike => leftTable.onColumns(joinKeyIndices: _*) - .innerJoin(rightTable.onColumns(joinKeyIndices: _*), false) - case LeftSemi => leftTable.onColumns(joinKeyIndices: _*) - .leftSemiJoin(rightTable.onColumns(joinKeyIndices: _*), false) - case LeftAnti => leftTable.onColumns(joinKeyIndices: _*) - .leftAntiJoin(rightTable.onColumns(joinKeyIndices: _*), false) - case FullOuter => leftTable.onColumns(joinKeyIndices: _*) - .fullJoin(rightTable.onColumns(joinKeyIndices: _*), false) - case _ => throw new NotImplementedError(s"Joint Type ${joinType.getClass} is not currently" + - s" supported") + // This is a work around added in response to https://github.com/NVIDIA/spark-rapids/issues/1643. + // to deal with slowness arising from many nulls in the build-side of the join. The work around + // should be removed when https://github.com/rapidsai/cudf/issues/7300 is addressed. + private[this] def filterNulls(table: Table, joinKeyIndices: Range, closeTable: Boolean): Table = { + var mask: ai.rapids.cudf.ColumnVector = null + try { + joinKeyIndices.indices.foreach { c => + mask = withResource(table.getColumn(c).isNotNull) { nn => + if (mask == null) { + nn.incRefCount() + } else { + withResource(mask) { _ => + mask.and(nn) + } + } + } + } + table.filter(mask) + } finally { + if (mask != null) { + mask.close() + } + + // in some cases, we cannot close the table since it was the build table and is + // reused. + if (closeTable) { + table.close() + } + } + } + + private[this] def doJoinLeftRight( + leftTable: Table, rightTable: Table, closeRightTable: Boolean): ColumnarBatch = { + + def withRightTable(body: Table => Table): Table = { + val builtAnyNullable = + (joinType == LeftSemi || joinType == LeftAnti) && gpuBuildKeys.exists(_.nullable) + + if (builtAnyNullable) { + withResource(filterNulls(rightTable, joinKeyIndices, closeRightTable)) { filtered => + body(filtered) + } + } else { + try { + body(rightTable) + } finally { + if (closeRightTable) { + rightTable.close() + } + } + } + } + + val joinedTable = withRightTable { rt => + joinType match { + case LeftOuter => leftTable.onColumns(joinKeyIndices: _*) + .leftJoin(rt.onColumns(joinKeyIndices: _*), false) + case RightOuter => rt.onColumns(joinKeyIndices: _*) + .leftJoin(leftTable.onColumns(joinKeyIndices: _*), false) + case _: InnerLike => leftTable.onColumns(joinKeyIndices: _*) + .innerJoin(rt.onColumns(joinKeyIndices: _*), false) + case LeftSemi => leftTable.onColumns(joinKeyIndices: _*) + .leftSemiJoin(rt.onColumns(joinKeyIndices: _*), false) + case LeftAnti => leftTable.onColumns(joinKeyIndices: _*) + .leftAntiJoin(rt.onColumns(joinKeyIndices: _*), false) + case FullOuter => leftTable.onColumns(joinKeyIndices: _*) + .fullJoin(rt.onColumns(joinKeyIndices: _*), false) + case _ => + throw new NotImplementedError(s"Joint Type ${joinType.getClass} is not currently" + + s" supported") + } } + try { val result = joinIndices.zip(output).map { case (joinIndex, outAttr) => GpuColumnVector.from(joinedTable.getColumn(joinIndex).incRefCount(), outAttr.dataType) @@ -289,5 +351,4 @@ trait GpuHashJoin extends GpuExec { joinedTable.close() } } - }