Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a heuristic to skip second or third agg pass #10950

Merged
merged 17 commits into from
Jun 29, 2024
Merged
1 change: 1 addition & 0 deletions docs/additional-functionality/advanced_configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ Name | Description | Default Value | Applicable at
<a name="shuffle.ucx.activeMessages.forceRndv"></a>spark.rapids.shuffle.ucx.activeMessages.forceRndv|Set to true to force 'rndv' mode for all UCX Active Messages. This should only be required with UCX 1.10.x. UCX 1.11.x deployments should set to false.|false|Startup
<a name="shuffle.ucx.managementServerHost"></a>spark.rapids.shuffle.ucx.managementServerHost|The host to be used to start the management server|null|Startup
<a name="shuffle.ucx.useWakeup"></a>spark.rapids.shuffle.ucx.useWakeup|When set to true, use UCX's event-based progress (epoll) in order to wake up the progress thread when needed, instead of a hot loop.|true|Startup
<a name="sql.agg.skipAggPassReductionRatio"></a>spark.rapids.sql.agg.skipAggPassReductionRatio|In non-final aggregation stages, if the previous pass has a row reduction ratio greater than this value, the next aggregation pass will be skipped.Setting this to 1 essentially disables this feature.|0.9|Runtime
<a name="sql.allowMultipleJars"></a>spark.rapids.sql.allowMultipleJars|Allow multiple rapids-4-spark, spark-rapids-jni, and cudf jars on the classpath. Spark will take the first one it finds, so the version may not be expected. Possisble values are ALWAYS: allow all jars, SAME_REVISION: only allow jars with the same revision, NEVER: do not allow multiple jars at all.|SAME_REVISION|Startup
<a name="sql.castDecimalToFloat.enabled"></a>spark.rapids.sql.castDecimalToFloat.enabled|Casting from decimal to floating point types on the GPU returns results that have tiny difference compared to results returned from CPU.|true|Runtime
<a name="sql.castFloatToDecimal.enabled"></a>spark.rapids.sql.castFloatToDecimal.enabled|Casting from floating point types to decimal on the GPU returns results that have tiny difference compared to results returned from CPU.|true|Runtime
Expand Down
11 changes: 7 additions & 4 deletions integration_tests/src/main/python/hash_aggregate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,15 @@
pytestmark = pytest.mark.nightly_resource_consuming_test

_float_conf = {'spark.rapids.sql.variableFloatAgg.enabled': 'true',
'spark.rapids.sql.castStringToFloat.enabled': 'true'
}
'spark.rapids.sql.castStringToFloat.enabled': 'true'
}

_float_smallbatch_conf = copy_and_update(_float_conf,
{'spark.rapids.sql.batchSizeBytes' : '250'})

_float_conf_skipagg = copy_and_update(_float_smallbatch_conf,
{'spark.rapids.sql.agg.skipAggPassReductionRatio': '0'})

_float_conf_partial = copy_and_update(_float_conf,
{'spark.rapids.sql.hashAgg.replaceMode': 'partial'})

Expand Down Expand Up @@ -221,8 +224,8 @@ def get_params(init_list, marked_params=[]):
return list


# Run these tests with in 4 modes, all on the GPU
_confs = [_float_conf, _float_smallbatch_conf, _float_conf_final, _float_conf_partial]
# Run these tests with in 5 modes, all on the GPU
_confs = [_float_conf, _float_smallbatch_conf, _float_conf_skipagg, _float_conf_final, _float_conf_partial]

# Pytest marker for list of operators allowed to run on the CPU,
# esp. useful in partial and final only modes.
Expand Down
4 changes: 2 additions & 2 deletions jenkins/spark-premerge-build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ ci_2() {
$MVN_CMD -U -B $MVN_URM_MIRROR clean package $MVN_BUILD_ARGS -DskipTests=true
export TEST_TAGS="not premerge_ci_1"
export TEST_TYPE="pre-commit"
export TEST_PARALLEL=5
export TEST_PARALLEL=4
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a workaround for #8652

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is okay as a short term work around, but I am not going to approve a PR with this in it.


# Download a Scala 2.12 build of spark
prepare_spark $SPARK_VER 2.12
Expand Down Expand Up @@ -206,7 +206,7 @@ ci_scala213() {
cd .. # Run integration tests in the project root dir to leverage test cases and resource files
export TEST_TAGS="not premerge_ci_1"
export TEST_TYPE="pre-commit"
export TEST_PARALLEL=5
export TEST_PARALLEL=4
# SPARK_HOME (and related) must be set to a Spark built with Scala 2.13
SPARK_HOME=$SPARK_HOME PYTHONPATH=$PYTHONPATH \
./integration_tests/run_pyspark_from_build.sh
Expand Down
130 changes: 106 additions & 24 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala
jlowe marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@ package com.nvidia.spark.rapids
import java.util

import scala.annotation.tailrec
import scala.collection.JavaConverters.collectionAsScalaIterableConverter
import scala.collection.mutable

import ai.rapids.cudf
Expand Down Expand Up @@ -549,7 +550,8 @@ object GpuAggregateIterator extends Logging {
object GpuAggFirstPassIterator {
def apply(cbIter: Iterator[ColumnarBatch],
aggHelper: AggHelper,
metrics: GpuHashAggregateMetrics): Iterator[SpillableColumnarBatch] = {
metrics: GpuHashAggregateMetrics
): Iterator[SpillableColumnarBatch] = {
val preprocessProjectIter = cbIter.map { cb =>
val sb = SpillableColumnarBatch (cb, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)
aggHelper.preStepBound.projectAndCloseWithRetrySingleBatch (sb)
Expand Down Expand Up @@ -707,6 +709,9 @@ object GpuAggFinalPassIterator {
* @param metrics metrics that will be updated during aggregation
* @param configuredTargetBatchSize user-specified value for the targeted input batch size
* @param useTieredProject user-specified option to enable tiered projections
* @param allowNonFullyAggregatedOutput if allowed to skip third pass Agg
* @param skipAggPassReductionRatio skip if the ratio of rows after a pass is bigger than this value
* @param localInputRowsCount metric to track the number of input rows processed locally
*/
class GpuMergeAggregateIterator(
firstPassIter: Iterator[SpillableColumnarBatch],
Expand All @@ -718,15 +723,21 @@ class GpuMergeAggregateIterator(
modeInfo: AggregateModeInfo,
metrics: GpuHashAggregateMetrics,
configuredTargetBatchSize: Long,
useTieredProject: Boolean)
useTieredProject: Boolean,
allowNonFullyAggregatedOutput: Boolean,
skipAggPassReductionRatio: Double,
localInputRowsCount: LocalGpuMetric)
extends Iterator[ColumnarBatch] with AutoCloseable with Logging {
private[this] val isReductionOnly = groupingExpressions.isEmpty
private[this] val targetMergeBatchSize = computeTargetMergeBatchSize(configuredTargetBatchSize)
private[this] val aggregatedBatches = new util.ArrayDeque[SpillableColumnarBatch]
private[this] var outOfCoreIter: Option[GpuOutOfCoreSortIterator] = None

/** Iterator for fetching aggregated batches if a sort-based fallback has occurred */
private[this] var sortFallbackIter: Option[Iterator[ColumnarBatch]] = None
/** Iterator for fetching aggregated batches either if:
* 1. a sort-based fallback has occurred
* 2. skip third pass agg has occurred
**/
private[this] var fallbackIter: Option[Iterator[ColumnarBatch]] = None

/** Whether a batch is pending for a reduction-only aggregation */
private[this] var hasReductionOnlyBatch: Boolean = isReductionOnly
Expand All @@ -739,24 +750,61 @@ class GpuMergeAggregateIterator(
}

override def hasNext: Boolean = {
sortFallbackIter.map(_.hasNext).getOrElse {
fallbackIter.map(_.hasNext).getOrElse {
// reductions produce a result even if the input is empty
hasReductionOnlyBatch || !aggregatedBatches.isEmpty || firstPassIter.hasNext
}
}

override def next(): ColumnarBatch = {
sortFallbackIter.map(_.next()).getOrElse {
fallbackIter.map(_.next()).getOrElse {
var shouldSkipThirdPassAgg = false

// aggregate and merge all pending inputs
if (firstPassIter.hasNext) {
aggregateInputBatches()
tryMergeAggregatedBatches()
// first pass agg
val rowsAfterFirstPassAgg = aggregateInputBatches()

// by now firstPassIter has been traversed, so localInputRowsCount is finished updating
if (isReductionOnly ||
skipAggPassReductionRatio * localInputRowsCount.value >= rowsAfterFirstPassAgg) {
// second pass agg
tryMergeAggregatedBatches()

val rowsAfterSecondPassAgg = aggregatedBatches.asScala.foldLeft(0L) {
(totalRows, batch) => totalRows + batch.numRows()
}
shouldSkipThirdPassAgg =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please correct me if I get anything wrong about the algorithm.

It looks like we do a first pass over the data and we place the results in the aggregatedBatches queue. We keep track of the total number of rows that survived that initial aggregation pass along with the number of rows that were input to this aggregation. If the number of rows after the first agg pass was reduced by at least X% where X is (1 - spark.rapids.sql.agg.skipAggPassReductionRatio), then we continue trying to combine the results together.

Otherwise we skip the "shouldSkipThirdPassAgg", which is confusing because we are not doing a third pass, this would be a second aggregation pass, but whatever.

Combining the results together is done by calling tryMergeAggregatedBatches(). This code will try and concatenate batches together up to a target input size and then merge those batches. It stops if we get to a single output batch, or we hit a situation where we could not find two batches that would fit in the target merge batch size.

At that point if we didn't reduce the number of rows by X% again, then we will also "shouldSkipThirdPassAgg"

At this point either the data is aggregated into a single batch that we can output, or we have multiple batches to deal with. If we are going to skip the third pass agg (and it is not a final aggregation), then we just output what we have done so far. Otherwise we do the sort fallback iterator.

That looks okay to me, but I am curious if you think we are being aggressive enough with this? and do we want follow on work to see if we can make it more aggressive. I am just a little concerned that we might do an entire first pass over multiple batches, when it is clear from the very first batch that this is just not going to work. We have to be very careful that we don't change the order of the batches so things like first and last still work correctly, but that should not be too difficult.

What if we read an input batch, and check if it reduced the number of rows by X% if not, we just instantly return it. No need to cache that batch at all. We just let downstream deal with the large batch we just saw. We keep doing that until we hit a situation where we did see something reduce in size. Then we start to cache them, so that we don't mess up the order. I am concerned about spilling and memory pressure . A map side aggregation is almost always going to feed into a shuffle and if we can release some of the batch batches quickly we reduce the GPU memory pressure because it should be pulled back off of the GPU without us needing to cache anything.

but it looks like we ask the firstPassIterator if there is more to process. firstPassIterator reads in the original input data, does the pre-project, and does an initial aggregation. aggregateInputBatches reads in those input batches and places them into a queue

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

second pass agg -> tryMergeAggregatedBatches() , third pass agg -> buildSortFallbackIterator, the naming should be correct.

rowsAfterSecondPassAgg > skipAggPassReductionRatio * rowsAfterFirstPassAgg
} else {
shouldSkipThirdPassAgg = true
logInfo(s"Rows after first pass aggregation $rowsAfterFirstPassAgg exceeds " +
s"${skipAggPassReductionRatio * 100}% of " +
s"localInputRowsCount ${localInputRowsCount.value}, skip the second pass agg")
}
}

if (aggregatedBatches.size() > 1) {
// Unable to merge to a single output, so must fall back to a sort-based approach.
sortFallbackIter = Some(buildSortFallbackIterator())
sortFallbackIter.get.next()
// Unable to merge to a single output, so must fall back
if (allowNonFullyAggregatedOutput && shouldSkipThirdPassAgg) {
// skip third pass agg, return the aggregated batches directly
logInfo(s"Rows after second pass aggregation exceeds " +
s"${skipAggPassReductionRatio * 100}% of " +
s"rows after first pass, skip the third pass agg")
fallbackIter = Some(new Iterator[ColumnarBatch] {
override def hasNext: Boolean = !aggregatedBatches.isEmpty

override def next(): ColumnarBatch = {
withResource(aggregatedBatches.pop()) { spillableBatch =>
spillableBatch.getColumnarBatch()
}
}
})
} else {
// fallback to sort agg, this is the third pass agg
fallbackIter = Some(buildSortFallbackIterator())
}
fallbackIter.get.next()
} else if (aggregatedBatches.isEmpty) {
if (hasReductionOnlyBatch) {
hasReductionOnlyBatch = false
Expand All @@ -779,7 +827,7 @@ class GpuMergeAggregateIterator(
aggregatedBatches.clear()
outOfCoreIter.foreach(_.close())
outOfCoreIter = None
sortFallbackIter = None
fallbackIter = None
hasReductionOnlyBatch = false
}

Expand All @@ -789,11 +837,15 @@ class GpuMergeAggregateIterator(
}

/** Aggregate all input batches and place the results in the aggregatedBatches queue. */
private def aggregateInputBatches(): Unit = {
private def aggregateInputBatches(): Long = {
var rowsAfter = 0L
// cache everything in the first pass
while (firstPassIter.hasNext) {
aggregatedBatches.add(firstPassIter.next())
val batch = firstPassIter.next()
rowsAfter += batch.numRows()
aggregatedBatches.add(batch)
}
rowsAfter
}

/**
Expand Down Expand Up @@ -1115,8 +1167,8 @@ abstract class GpuBaseAggregateMeta[INPUT <: SparkPlan](

/*
* Type inferencing by the Scala compiler will choose the most specific return type
* something like Array[Set[Product with Serializable with AggregateMode]] or with
* slight differences depending on Scala version. Here we ensure this is
* something like Array[Set[Product with Serializable with AggregateMode]] or with
* slight differences depending on Scala version. Here we ensure this is
* Array[Set[AggregateMode]] to perform the subsequent Set and Array operations properly.
*/
val aggPatternsCanReplace = strPatternToReplace.split("\\|").map { subPattern =>
Expand Down Expand Up @@ -1189,6 +1241,12 @@ abstract class GpuBaseAggregateMeta[INPUT <: SparkPlan](
mode == Partial || mode == PartialMerge
} && agg.groupingExpressions.nonEmpty // Don't do this for a reduce...

// for a aggregateExpressions.isEmpty case, we cannot distinguish between final and non-final,
// so don't allow it.
lazy val allowNonFullyAggregatedOutput = aggModes.forall { mode =>
mode == Partial || mode == PartialMerge
} && agg.aggregateExpressions.nonEmpty

lazy val groupingCanBeSorted = agg.groupingExpressions.forall { expr =>
orderable.isSupportedByPlugin(expr.dataType)
}
Expand Down Expand Up @@ -1272,7 +1330,9 @@ abstract class GpuBaseAggregateMeta[INPUT <: SparkPlan](
useTiered,
estimatedPreProcessGrowth,
conf.forceSinglePassPartialSortAgg,
allowSinglePassAgg)
allowSinglePassAgg,
allowNonFullyAggregatedOutput,
conf.skipAggPassReductionRatio)
}
}

Expand Down Expand Up @@ -1358,7 +1418,9 @@ abstract class GpuTypedImperativeSupportedAggregateExecMeta[INPUT <: BaseAggrega
// For now we are just going to go with the original hash aggregation
1.0,
false,
false)
false,
false,
1)
} else {
super.convertToGpu()
}
Expand Down Expand Up @@ -1707,6 +1769,10 @@ object GpuHashAggregateExecBase {
* @param child incoming plan (where we get input columns from)
* @param configuredTargetBatchSize user-configured maximum device memory size of a batch
* @param configuredTieredProjectEnabled configurable optimization to use tiered projections
* @param allowNonFullyAggregatedOutput whether we can skip the third pass of aggregation
* (can omit non fully aggregated data for non-final
* stage of aggregation)
* @param skipAggPassReductionRatio skip if the ratio of rows after a pass is bigger than this value
*/
case class GpuHashAggregateExec(
requiredChildDistributionExpressions: Option[Seq[Expression]],
Expand All @@ -1719,7 +1785,10 @@ case class GpuHashAggregateExec(
configuredTieredProjectEnabled: Boolean,
estimatedPreProcessGrowth: Double,
forceSinglePassAgg: Boolean,
allowSinglePassAgg: Boolean) extends ShimUnaryExecNode with GpuExec {
allowSinglePassAgg: Boolean,
allowNonFullyAggregatedOutput: Boolean,
skipAggPassReductionRatio: Double
) extends ShimUnaryExecNode with GpuExec {

// lifted directly from `BaseAggregateExec.inputAttributes`, edited comment.
def inputAttributes: Seq[Attribute] =
Expand Down Expand Up @@ -1804,7 +1873,7 @@ case class GpuHashAggregateExec(
boundGroupExprs, aggregateExprs, aggregateAttrs, resultExprs, modeInfo,
localEstimatedPreProcessGrowth, alreadySorted, expectedOrdering,
postBoundReferences, targetBatchSize, aggMetrics, useTieredProject,
localForcePre, localAllowPre)
localForcePre, localAllowPre, allowNonFullyAggregatedOutput, skipAggPassReductionRatio)
}
}

Expand Down Expand Up @@ -1920,7 +1989,10 @@ class DynamicGpuPartialSortAggregateIterator(
metrics: GpuHashAggregateMetrics,
useTiered: Boolean,
forceSinglePassAgg: Boolean,
allowSinglePassAgg: Boolean) extends Iterator[ColumnarBatch] {
allowSinglePassAgg: Boolean,
allowNonFullyAggregatedOutput: Boolean,
skipAggPassReductionRatio: Double
) extends Iterator[ColumnarBatch] {
private var aggIter: Option[Iterator[ColumnarBatch]] = None
private[this] val isReductionOnly = boundGroupExprs.outputTypes.isEmpty

Expand Down Expand Up @@ -1998,7 +2070,14 @@ class DynamicGpuPartialSortAggregateIterator(
inputAttrs.map(_.dataType).toArray, preProcessAggHelper.preStepBound,
metrics.opTime, metrics.numPreSplits)

val firstPassIter = GpuAggFirstPassIterator(splitInputIter, preProcessAggHelper, metrics)
val localInputRowsMetrics = new LocalGpuMetric
val firstPassIter = GpuAggFirstPassIterator(
splitInputIter.map(cb => {
localInputRowsMetrics += cb.numRows()
cb
}),
preProcessAggHelper,
metrics)

val mergeIter = new GpuMergeAggregateIterator(
firstPassIter,
Expand All @@ -2010,7 +2089,10 @@ class DynamicGpuPartialSortAggregateIterator(
modeInfo,
metrics,
configuredTargetBatchSize,
useTiered)
useTiered,
allowNonFullyAggregatedOutput,
skipAggPassReductionRatio,
localInputRowsMetrics)

GpuAggFinalPassIterator.makeIter(mergeIter, postBoundReferences, metrics)
}
Expand Down
10 changes: 10 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1509,6 +1509,14 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern")
.booleanConf
.createWithDefault(true)

val SKIP_AGG_PASS_REDUCTION_RATIO = conf("spark.rapids.sql.agg.skipAggPassReductionRatio")
.doc("In non-final aggregation stages, if the previous pass has a row reduction ratio " +
"greater than this value, the next aggregation pass will be skipped." +
"Setting this to 1 essentially disables this feature.")
.doubleConf
.checkValue(v => v >= 0 && v <= 1, "The ratio value must be in [0, 1].")
.createWithDefault(0.9)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How did you reach 90% as the cutoff? Is it just arbitrary or did you do some tuning/testing to figure this out.

Copy link
Collaborator Author

@binmahone binmahone Jun 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is arbitrary. I chose 0.9 in fear of incurring too much pressure to shuffle


val FORCE_SINGLE_PASS_PARTIAL_SORT_AGG: ConfEntryWithDefault[Boolean] =
conf("spark.rapids.sql.agg.forceSinglePassPartialSort")
.doc("Force a single pass partial sort agg to happen in all cases that it could, " +
Expand Down Expand Up @@ -3043,6 +3051,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val forceSinglePassPartialSortAgg: Boolean = get(FORCE_SINGLE_PASS_PARTIAL_SORT_AGG)

lazy val skipAggPassReductionRatio: Double = get(SKIP_AGG_PASS_REDUCTION_RATIO)

lazy val isRegExpEnabled: Boolean = get(ENABLE_REGEXP)

lazy val maxRegExpStateMemory: Long = {
Expand Down