Skip to content

Commit

Permalink
Filter rows with null keys when coalescing due to reaching cuDF row l…
Browse files Browse the repository at this point in the history
…imits [databricks] (NVIDIA#5531)

* Filter rows with null keys when coalescing due to reaching cuDF row limits

Signed-off-by: Alessandro Bellina <abellina@nvidia.com>

* Fix typos

* Add RequireSingleBatchLike to be able to use RequireSingleBatchWithFilter as a real goal

* GpuShuffledHashJoin can now use RequireSingleBatchWithFilter as a coalesce goal

* Remove extra comment

* Partially address code review comments

* Null out batch after filtering to preveng double closing
  • Loading branch information
abellina authored May 23, 2022
1 parent eb0f23e commit 5f33368
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ object ConcatAndConsumeAll {

object CoalesceGoal {
def maxRequirement(a: CoalesceGoal, b: CoalesceGoal): CoalesceGoal = (a, b) match {
case (RequireSingleBatch, _) => a
case (_, RequireSingleBatch) => b
case (_: RequireSingleBatchLike, _) => a
case (_, _: RequireSingleBatchLike) => b
case (_: BatchedByKey, _: TargetSize) => a
case (_: TargetSize, _: BatchedByKey) => b
case (a: BatchedByKey, b: BatchedByKey) =>
Expand All @@ -121,8 +121,8 @@ object CoalesceGoal {
}

def minProvided(a: CoalesceGoal, b:CoalesceGoal): CoalesceGoal = (a, b) match {
case (RequireSingleBatch, _) => b
case (_, RequireSingleBatch) => a
case (_: RequireSingleBatchLike, _) => b
case (_, _: RequireSingleBatchLike) => a
case (_: BatchedByKey, _: TargetSize) => b
case (_: TargetSize, _: BatchedByKey) => a
case (a: BatchedByKey, b: BatchedByKey) =>
Expand All @@ -136,8 +136,8 @@ object CoalesceGoal {
}

def satisfies(found: CoalesceGoal, required: CoalesceGoal): Boolean = (found, required) match {
case (RequireSingleBatch, _) => true
case (_, RequireSingleBatch) => false
case (_: RequireSingleBatchLike, _) => true
case (_, _: RequireSingleBatchLike) => false
case (_: BatchedByKey, _: TargetSize) => true
case (_: TargetSize, _: BatchedByKey) => false
case (BatchedByKey(aOrder), BatchedByKey(bOrder)) =>
Expand Down Expand Up @@ -167,19 +167,41 @@ sealed abstract class CoalesceSizeGoal extends CoalesceGoal {
}

/**
* A single batch is required as the input to a note in the SparkPlan. This means
* Trait used for pattern matching for single batch coalesce goals.
*/
trait RequireSingleBatchLike

/**
* A single batch is required as the input to a node in the SparkPlan. This means
* all of the data for a given task is in a single batch. This should be avoided
* as much as possible because it can result in running out of memory or run into
* limitations of the batch size by both Spark and cudf.
*/
case object RequireSingleBatch extends CoalesceSizeGoal {
case object RequireSingleBatch extends CoalesceSizeGoal with RequireSingleBatchLike {

override val targetSizeBytes: Long = Long.MaxValue

/** Override toString to improve readability of Spark explain output */
override def toString: String = "RequireSingleBatch"
}

/**
* This is exactly the same as `RequireSingleBatch` except that if the
* batch would fail to coalesce because it reaches cuDF row-count limits, the
* coalesce code is free to null filter given the filter expression in `filterExpression`.
* @note This is an ugly hack because ideally these rows are never read from the input source
* given that we normally push down IsNotNull in Spark. This should be removed when
* we can handle this in a proper way, likely at the logical plan optimization level.
* More details here: https://issues.apache.org/jira/browse/SPARK-39131
*/
case class RequireSingleBatchWithFilter(filterExpression: GpuExpression)
extends CoalesceSizeGoal with RequireSingleBatchLike {

override val targetSizeBytes: Long = Long.MaxValue

/** Override toString to improve readability of Spark explain output */
override def toString: String = "RequireSingleBatchWithFilter"
}
/**
* Produce a stream of batches that are at most the given size in bytes. The size
* is estimated in some cases so it may go over a little, but it should generally be
Expand Down Expand Up @@ -228,6 +250,12 @@ abstract class AbstractGpuCoalesceIterator(

private var batchInitialized: Boolean = false

/**
* This is defined iff `goal` is `RequireSingleBatchWithFilter` and we have
* reached the cuDF row-count limit.
*/
private var inputFilterExpression: Option[Expression] = None

/**
* Return true if there is something saved on deck for later processing.
*/
Expand Down Expand Up @@ -351,7 +379,17 @@ abstract class AbstractGpuCoalesceIterator(

// there is a hard limit of 2^31 rows
while (numRows < Int.MaxValue && !hasOnDeck && iter.hasNext) {
closeOnExcept(iter.next()) { cb =>
val cbFromIter = iter.next()

var cb = if (inputFilterExpression.isDefined) {
// If we have reached the cuDF limit once, proactively filter batches
// after that first limit is reached.
GpuFilter(cbFromIter, inputFilterExpression.get)
} else {
cbFromIter
}

closeOnExcept(cb) { _ =>
val nextRows = cb.numRows()
numInputBatches += 1

Expand All @@ -366,12 +404,40 @@ abstract class AbstractGpuCoalesceIterator(
val wouldBeBytes = numBytes + nextBytes

if (wouldBeRows > Int.MaxValue) {
if (goal == RequireSingleBatch) {
throw new IllegalStateException("A single batch is required for this operation," +
goal match {
case RequireSingleBatch =>
throw new IllegalStateException("A single batch is required for this operation," +
s" but cuDF only supports ${Int.MaxValue} rows. At least $wouldBeRows" +
s" are in this partition. Please try increasing your partition count.")
case RequireSingleBatchWithFilter(filterExpression) =>
// filter what we had already stored
val filteredDown = GpuFilter(concatAllAndPutOnGPU(), filterExpression)
closeOnExcept(filteredDown) { _ =>
// filter the incoming batch as well
closeOnExcept(GpuFilter(cb, filterExpression)) { filteredCb =>
cb = null // null out `cb` to prevent multiple close calls
val filteredWouldBeRows = filteredDown.numRows() + filteredCb.numRows()
if (filteredWouldBeRows > Int.MaxValue) {
throw new IllegalStateException(
"A single batch is required for this operation, but cuDF only supports " +
s"${Int.MaxValue} rows. At least $filteredWouldBeRows are in this " +
"partition, even after filtering nulls. " +
"Please try increasing your partition count.")
}
if (inputFilterExpression.isEmpty) {
inputFilterExpression = Some(filterExpression)
logWarning("Switched to null-filtering mode. This coalesce iterator " +
"succeeded to fit rows under the cuDF limit only after null filtering. " +
"Please try increasing your partition count.")
}
numRows = filteredWouldBeRows
numBytes = getBatchDataSize(filteredDown) + getBatchDataSize(filteredCb)
addBatch(filteredDown)
addBatch(filteredCb)
}
}
case _ => saveOnDeck(cb) // not a single batch requirement
}
saveOnDeck(cb)
} else if (batchRowLimit > 0 && wouldBeRows > batchRowLimit) {
saveOnDeck(cb)
} else if (wouldBeBytes > goal.targetSizeBytes && numBytes > 0) {
Expand All @@ -394,9 +460,13 @@ abstract class AbstractGpuCoalesceIterator(
val isLastBatch = !(hasOnDeck || iter.hasNext)

// enforce single batch limit when appropriate
if (goal == RequireSingleBatch && !isLastBatch) {
throw new IllegalStateException("A single batch is required for this operation." +
" Please try increasing your partition count.")
if (!isLastBatch) {
goal match {
case _: RequireSingleBatchLike =>
throw new IllegalStateException("A single batch is required for this operation," +
" Please try increasing your partition count.")
case _ =>
}
}

numOutputRows += numRows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ class RowToColumnarIterator(
}

// enforce RequireSingleBatch limit
if (rowIter.hasNext && localGoal == RequireSingleBatch) {
if (rowIter.hasNext && localGoal.isInstanceOf[RequireSingleBatchLike]) {
throw new IllegalStateException("A single batch is required for this operation." +
" Please try increasing your partition count.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.{FullOuter, JoinType}
import org.apache.spark.sql.catalyst.plans.{FullOuter, InnerLike, JoinType, LeftAnti, LeftSemi}
import org.apache.spark.sql.catalyst.plans.physical.Distribution
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec
import org.apache.spark.sql.rapids.GpuOr
import org.apache.spark.sql.rapids.execution.{GpuHashJoin, JoinTypeChecks}
import org.apache.spark.sql.types.BooleanType
import org.apache.spark.sql.vectorized.ColumnarBatch

class GpuShuffledHashJoinMeta(
Expand Down Expand Up @@ -117,10 +119,24 @@ case class GpuShuffledHashJoinExec(
"GpuShuffledHashJoin does not support the execute() code path.")
}

// Goal to be used for the coalescing the build side. Note that this is internal to
// the join and not used for planning purposes. The two valid choices are `RequireSingleBatch` or
// `RequireSingleBatchWithFilter`
private lazy val buildGoal: CoalesceSizeGoal = joinType match {
case _: InnerLike | LeftSemi | LeftAnti =>
val nullFilteringMask = boundBuildKeys.map { bk =>
// coalesce(key1, false) or coalesce(key2, false) ... or ... coalesce(keyN, false)
// For any row with a key that is null, this filter mask will remove those rows.
GpuCoalesce(Seq(GpuCast(bk, BooleanType), GpuLiteral(false)))
}.reduce(GpuOr)
RequireSingleBatchWithFilter(nullFilteringMask)
case _ => RequireSingleBatch
}

override def childrenCoalesceGoal: Seq[CoalesceGoal] = (joinType, buildSide) match {
case (FullOuter, _) => Seq(RequireSingleBatch, RequireSingleBatch)
case (_, GpuBuildLeft) => Seq(RequireSingleBatch, null)
case (_, GpuBuildRight) => Seq(null, RequireSingleBatch)
case (_, GpuBuildLeft) => Seq(buildGoal, null)
case (_, GpuBuildRight) => Seq(null, buildGoal)
}

override def doExecuteColumnar() : RDD[ColumnarBatch] = {
Expand Down Expand Up @@ -149,6 +165,7 @@ case class GpuShuffledHashJoinExec(
(streamIter, buildIter) => {
val (builtBatch, maybeBufferedStreamIter) =
GpuShuffledHashJoinExec.getBuiltBatchAndStreamIter(
buildGoal,
batchSizeBytes,
localBuildOutput,
buildIter,
Expand Down Expand Up @@ -219,6 +236,7 @@ object GpuShuffledHashJoinExec extends Arm {
* because we hold onto the semaphore during the entire time after realizing the goal
* has been hit.
*
* @param buildGoal the build goal to use when coalescing batches
* @param hostTargetBatchSize target batch size goal on the host
* @param buildOutput output attributes of the build plan
* @param buildIter build iterator
Expand All @@ -230,6 +248,7 @@ object GpuShuffledHashJoinExec extends Arm {
* used for the join
*/
def getBuiltBatchAndStreamIter(
buildGoal: CoalesceSizeGoal,
hostTargetBatchSize: Long,
buildOutput: Seq[Attribute],
buildIter: Iterator[ColumnarBatch],
Expand Down Expand Up @@ -295,7 +314,7 @@ object GpuShuffledHashJoinExec extends Arm {
}
} else {
val buildBatch = getBuildBatchFromUnfinished(
Seq(hostConcatResult).iterator ++ hostConcatIter,
buildGoal, Seq(hostConcatResult).iterator ++ hostConcatIter,
buildOutput, spillCallback, coalesceMetricsMap)
buildTime += System.nanoTime() - startTime
(buildBatch, streamIter)
Expand All @@ -307,6 +326,7 @@ object GpuShuffledHashJoinExec extends Arm {
}

private def getBuildBatchFromUnfinished(
buildGoal: CoalesceSizeGoal,
iterWithPrior: Iterator[HostConcatResult],
buildOutput: Seq[Attribute],
spillCallback: SpillCallback,
Expand All @@ -322,10 +342,9 @@ object GpuShuffledHashJoinExec extends Arm {
iterWithPrior,
dataTypes,
coalesceMetricsMap)
val res = ConcatAndConsumeAll.getSingleBatchWithVerification(
val res = ConcatAndConsumeAll.getSingleBatchWithVerification(
new GpuCoalesceIterator(shuffleCoalesce,
dataTypes,
RequireSingleBatch,
dataTypes, buildGoal,
NoopMetric, NoopMetric, NoopMetric, NoopMetric, NoopMetric,
coalesceMetricsMap(GpuMetric.CONCAT_TIME),
coalesceMetricsMap(GpuMetric.OP_TIME),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
case (plan, null) =>
// No coalesce requested
insertCoalesce(plan, disableUntilInput)
case (plan, goal @ RequireSingleBatch) =>
case (plan, goal: RequireSingleBatchLike) =>
// Even if coalesce is disabled a single batch is required to make this operator work
// This should not cause bugs because we require a single batch in situations where
// Spark also buffers data, so any operator that needs coalesce disabled would also
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ class HostToGpuCoalesceIterator(iter: Iterator[ColumnarBatch],
opName) {

// RequireSingleBatch goal is intentionally not supported in this iterator
assert(goal != RequireSingleBatch)
assert(!goal.isInstanceOf[RequireSingleBatchLike])

var batchBuilder: GpuColumnVector.GpuColumnarBatchBuilderBase = _
var totalRows = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,7 @@ trait GpuHashJoin extends GpuExec {
case GpuBuildLeft => GpuExec.outputBatching(right)
case GpuBuildRight => GpuExec.outputBatching(left)
}
if (batching == RequireSingleBatch) {
if (batching.isInstanceOf[RequireSingleBatchLike]) {
RequireSingleBatch
} else {
null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ private class ExternalRowToColumnarIterator(
}

// enforce single batch limit when appropriate
if (rowIter.hasNext && localGoal == RequireSingleBatch) {
if (rowIter.hasNext && localGoal.isInstanceOf[RequireSingleBatchLike]) {
throw new IllegalStateException("A single batch is required for this operation." +
" Please try increasing your partition count.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class GpuShuffledHashJoinExecSuite extends FunSuite with Arm with MockitoSugar {
when(mockBuildIter.hasNext).thenReturn(false)
val mockStreamIter = mock[Iterator[ColumnarBatch]]
val (builtBatch, bStreamIter) = GpuShuffledHashJoinExec.getBuiltBatchAndStreamIter(
RequireSingleBatch,
0,
Seq.empty,
mockBuildIter,
Expand Down Expand Up @@ -71,6 +72,7 @@ class GpuShuffledHashJoinExecSuite extends FunSuite with Arm with MockitoSugar {
when(buildIter.buffered).thenReturn(buildBufferedIter)
val mockStreamIter = mock[Iterator[ColumnarBatch]]
val (builtBatch, bStreamIter) = GpuShuffledHashJoinExec.getBuiltBatchAndStreamIter(
RequireSingleBatch,
0,
Seq.empty,
buildIter,
Expand Down Expand Up @@ -103,6 +105,7 @@ class GpuShuffledHashJoinExecSuite extends FunSuite with Arm with MockitoSugar {
val buildIter = Seq(emptyBatch).iterator
val mockStreamIter = mock[Iterator[ColumnarBatch]]
val (builtBatch, bStreamIter) = GpuShuffledHashJoinExec.getBuiltBatchAndStreamIter(
RequireSingleBatch,
0,
Seq.empty,
buildIter,
Expand All @@ -129,6 +132,7 @@ class GpuShuffledHashJoinExecSuite extends FunSuite with Arm with MockitoSugar {
val buildIter = Seq(batch).iterator
val mockStreamIter = mock[Iterator[ColumnarBatch]]
val (builtBatch, bStreamIter) = GpuShuffledHashJoinExec.getBuiltBatchAndStreamIter(
RequireSingleBatch,
0,
Seq.empty,
buildIter,
Expand Down Expand Up @@ -182,6 +186,7 @@ class GpuShuffledHashJoinExecSuite extends FunSuite with Arm with MockitoSugar {
val buildIter = Seq(serializedBatch).iterator
val attrs = AttributeReference("a", IntegerType, false)() :: Nil
val (builtBatch, bStreamIter) = GpuShuffledHashJoinExec.getBuiltBatchAndStreamIter(
RequireSingleBatch,
1024,
attrs,
buildIter,
Expand Down Expand Up @@ -215,6 +220,7 @@ class GpuShuffledHashJoinExecSuite extends FunSuite with Arm with MockitoSugar {
val buildIter = Seq(serializedBatch).iterator
val attrs = AttributeReference("a", IntegerType, false)() :: Nil
val (builtBatch, bStreamIter) = GpuShuffledHashJoinExec.getBuiltBatchAndStreamIter(
RequireSingleBatch,
1024,
attrs,
buildIter,
Expand Down Expand Up @@ -252,6 +258,7 @@ class GpuShuffledHashJoinExecSuite extends FunSuite with Arm with MockitoSugar {
val buildIter = Seq(serializedBatch, serializedBatch2).iterator
val attrs = AttributeReference("a", IntegerType, false)() :: Nil
val (builtBatch, bStreamIter) = GpuShuffledHashJoinExec.getBuiltBatchAndStreamIter(
RequireSingleBatch,
1,
attrs,
buildIter,
Expand Down Expand Up @@ -290,6 +297,7 @@ class GpuShuffledHashJoinExecSuite extends FunSuite with Arm with MockitoSugar {
val buildIter = Seq(serializedBatch, serializedBatch2).iterator
val attrs = AttributeReference("a", IntegerType, false)() :: Nil
val (builtBatch, bStreamIter) = GpuShuffledHashJoinExec.getBuiltBatchAndStreamIter(
RequireSingleBatch,
1024,
attrs,
buildIter,
Expand Down

0 comments on commit 5f33368

Please sign in to comment.