-
Notifications
You must be signed in to change notification settings - Fork 232
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a heuristic to skip second or third agg pass #10950
Changes from 14 commits
bb35e1e
8010c01
e5b2fef
7451f84
4af0c72
35834cb
97230b5
f0f47bd
91b877c
c9925c3
c62838a
c4c5053
9e79773
ac4801b
30961ab
283a4a5
3e51b42
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
jlowe marked this conversation as resolved.
Show resolved
Hide resolved
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
/* | ||
* Copyright (c) 2019-2023, NVIDIA CORPORATION. | ||
* Copyright (c) 2019-2024, NVIDIA CORPORATION. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
|
@@ -19,6 +19,7 @@ package com.nvidia.spark.rapids | |
import java.util | ||
|
||
import scala.annotation.tailrec | ||
import scala.collection.JavaConverters.collectionAsScalaIterableConverter | ||
import scala.collection.mutable | ||
|
||
import ai.rapids.cudf | ||
|
@@ -549,7 +550,8 @@ object GpuAggregateIterator extends Logging { | |
object GpuAggFirstPassIterator { | ||
def apply(cbIter: Iterator[ColumnarBatch], | ||
aggHelper: AggHelper, | ||
metrics: GpuHashAggregateMetrics): Iterator[SpillableColumnarBatch] = { | ||
metrics: GpuHashAggregateMetrics | ||
): Iterator[SpillableColumnarBatch] = { | ||
val preprocessProjectIter = cbIter.map { cb => | ||
val sb = SpillableColumnarBatch (cb, SpillPriorities.ACTIVE_ON_DECK_PRIORITY) | ||
aggHelper.preStepBound.projectAndCloseWithRetrySingleBatch (sb) | ||
|
@@ -707,6 +709,9 @@ object GpuAggFinalPassIterator { | |
* @param metrics metrics that will be updated during aggregation | ||
* @param configuredTargetBatchSize user-specified value for the targeted input batch size | ||
* @param useTieredProject user-specified option to enable tiered projections | ||
* @param allowNonFullyAggregatedOutput if allowed to skip third pass Agg | ||
* @param skipAggPassReductionRatio skip if the ratio of rows after a pass is bigger than this value | ||
* @param localInputRowsCount metric to track the number of input rows processed locally | ||
*/ | ||
class GpuMergeAggregateIterator( | ||
firstPassIter: Iterator[SpillableColumnarBatch], | ||
|
@@ -718,15 +723,21 @@ class GpuMergeAggregateIterator( | |
modeInfo: AggregateModeInfo, | ||
metrics: GpuHashAggregateMetrics, | ||
configuredTargetBatchSize: Long, | ||
useTieredProject: Boolean) | ||
useTieredProject: Boolean, | ||
allowNonFullyAggregatedOutput: Boolean, | ||
skipAggPassReductionRatio: Double, | ||
localInputRowsCount: LocalGpuMetric) | ||
extends Iterator[ColumnarBatch] with AutoCloseable with Logging { | ||
private[this] val isReductionOnly = groupingExpressions.isEmpty | ||
private[this] val targetMergeBatchSize = computeTargetMergeBatchSize(configuredTargetBatchSize) | ||
private[this] val aggregatedBatches = new util.ArrayDeque[SpillableColumnarBatch] | ||
private[this] var outOfCoreIter: Option[GpuOutOfCoreSortIterator] = None | ||
|
||
/** Iterator for fetching aggregated batches if a sort-based fallback has occurred */ | ||
private[this] var sortFallbackIter: Option[Iterator[ColumnarBatch]] = None | ||
/** Iterator for fetching aggregated batches either if: | ||
* 1. a sort-based fallback has occurred | ||
* 2. skip third pass agg has occurred | ||
**/ | ||
private[this] var fallbackIter: Option[Iterator[ColumnarBatch]] = None | ||
|
||
/** Whether a batch is pending for a reduction-only aggregation */ | ||
private[this] var hasReductionOnlyBatch: Boolean = isReductionOnly | ||
|
@@ -739,24 +750,61 @@ class GpuMergeAggregateIterator( | |
} | ||
|
||
override def hasNext: Boolean = { | ||
sortFallbackIter.map(_.hasNext).getOrElse { | ||
fallbackIter.map(_.hasNext).getOrElse { | ||
// reductions produce a result even if the input is empty | ||
hasReductionOnlyBatch || !aggregatedBatches.isEmpty || firstPassIter.hasNext | ||
} | ||
} | ||
|
||
override def next(): ColumnarBatch = { | ||
sortFallbackIter.map(_.next()).getOrElse { | ||
fallbackIter.map(_.next()).getOrElse { | ||
var shouldSkipThirdPassAgg = false | ||
|
||
// aggregate and merge all pending inputs | ||
if (firstPassIter.hasNext) { | ||
aggregateInputBatches() | ||
tryMergeAggregatedBatches() | ||
// first pass agg | ||
val rowsAfterFirstPassAgg = aggregateInputBatches() | ||
|
||
// by now firstPassIter has been traversed, so localInputRowsCount is finished updating | ||
if (isReductionOnly || | ||
skipAggPassReductionRatio * localInputRowsCount.value >= rowsAfterFirstPassAgg) { | ||
// second pass agg | ||
tryMergeAggregatedBatches() | ||
|
||
val rowsAfterSecondPassAgg = aggregatedBatches.asScala.foldLeft(0L) { | ||
(totalRows, batch) => totalRows + batch.numRows() | ||
} | ||
shouldSkipThirdPassAgg = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you please correct me if I get anything wrong about the algorithm. It looks like we do a first pass over the data and we place the results in the Otherwise we skip the "shouldSkipThirdPassAgg", which is confusing because we are not doing a third pass, this would be a second aggregation pass, but whatever. Combining the results together is done by calling At that point if we didn't reduce the number of rows by X% again, then we will also "shouldSkipThirdPassAgg" At this point either the data is aggregated into a single batch that we can output, or we have multiple batches to deal with. If we are going to skip the third pass agg (and it is not a final aggregation), then we just output what we have done so far. Otherwise we do the sort fallback iterator. That looks okay to me, but I am curious if you think we are being aggressive enough with this? and do we want follow on work to see if we can make it more aggressive. I am just a little concerned that we might do an entire first pass over multiple batches, when it is clear from the very first batch that this is just not going to work. We have to be very careful that we don't change the order of the batches so things like first and last still work correctly, but that should not be too difficult. What if we read an input batch, and check if it reduced the number of rows by X% if not, we just instantly return it. No need to cache that batch at all. We just let downstream deal with the large batch we just saw. We keep doing that until we hit a situation where we did see something reduce in size. Then we start to cache them, so that we don't mess up the order. I am concerned about spilling and memory pressure . A map side aggregation is almost always going to feed into a shuffle and if we can release some of the batch batches quickly we reduce the GPU memory pressure because it should be pulled back off of the GPU without us needing to cache anything. but it looks like we ask the firstPassIterator if there is more to process. firstPassIterator reads in the original input data, does the pre-project, and does an initial aggregation. aggregateInputBatches reads in those input batches and places them into a queue There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. second pass agg -> tryMergeAggregatedBatches() , third pass agg -> buildSortFallbackIterator, the naming should be correct. |
||
rowsAfterSecondPassAgg > skipAggPassReductionRatio * rowsAfterFirstPassAgg | ||
} else { | ||
shouldSkipThirdPassAgg = true | ||
logInfo(s"Rows after first pass aggregation $rowsAfterFirstPassAgg exceeds " + | ||
s"${skipAggPassReductionRatio * 100}% of " + | ||
s"localInputRowsCount ${localInputRowsCount.value}, skip the second pass agg") | ||
} | ||
} | ||
|
||
if (aggregatedBatches.size() > 1) { | ||
// Unable to merge to a single output, so must fall back to a sort-based approach. | ||
sortFallbackIter = Some(buildSortFallbackIterator()) | ||
sortFallbackIter.get.next() | ||
// Unable to merge to a single output, so must fall back | ||
if (allowNonFullyAggregatedOutput && shouldSkipThirdPassAgg) { | ||
// skip third pass agg, return the aggregated batches directly | ||
logInfo(s"Rows after second pass aggregation exceeds " + | ||
s"${skipAggPassReductionRatio * 100}% of " + | ||
s"rows after first pass, skip the third pass agg") | ||
fallbackIter = Some(new Iterator[ColumnarBatch] { | ||
override def hasNext: Boolean = !aggregatedBatches.isEmpty | ||
|
||
override def next(): ColumnarBatch = { | ||
withResource(aggregatedBatches.pop()) { spillableBatch => | ||
spillableBatch.getColumnarBatch() | ||
} | ||
} | ||
}) | ||
} else { | ||
// fallback to sort agg, this is the third pass agg | ||
fallbackIter = Some(buildSortFallbackIterator()) | ||
} | ||
fallbackIter.get.next() | ||
} else if (aggregatedBatches.isEmpty) { | ||
if (hasReductionOnlyBatch) { | ||
hasReductionOnlyBatch = false | ||
|
@@ -779,7 +827,7 @@ class GpuMergeAggregateIterator( | |
aggregatedBatches.clear() | ||
outOfCoreIter.foreach(_.close()) | ||
outOfCoreIter = None | ||
sortFallbackIter = None | ||
fallbackIter = None | ||
hasReductionOnlyBatch = false | ||
} | ||
|
||
|
@@ -789,11 +837,15 @@ class GpuMergeAggregateIterator( | |
} | ||
|
||
/** Aggregate all input batches and place the results in the aggregatedBatches queue. */ | ||
private def aggregateInputBatches(): Unit = { | ||
private def aggregateInputBatches(): Long = { | ||
var rowsAfter = 0L | ||
// cache everything in the first pass | ||
while (firstPassIter.hasNext) { | ||
aggregatedBatches.add(firstPassIter.next()) | ||
val batch = firstPassIter.next() | ||
rowsAfter += batch.numRows() | ||
aggregatedBatches.add(batch) | ||
} | ||
rowsAfter | ||
} | ||
|
||
/** | ||
|
@@ -1115,8 +1167,8 @@ abstract class GpuBaseAggregateMeta[INPUT <: SparkPlan]( | |
|
||
/* | ||
* Type inferencing by the Scala compiler will choose the most specific return type | ||
* something like Array[Set[Product with Serializable with AggregateMode]] or with | ||
* slight differences depending on Scala version. Here we ensure this is | ||
* something like Array[Set[Product with Serializable with AggregateMode]] or with | ||
* slight differences depending on Scala version. Here we ensure this is | ||
* Array[Set[AggregateMode]] to perform the subsequent Set and Array operations properly. | ||
*/ | ||
val aggPatternsCanReplace = strPatternToReplace.split("\\|").map { subPattern => | ||
|
@@ -1189,6 +1241,12 @@ abstract class GpuBaseAggregateMeta[INPUT <: SparkPlan]( | |
mode == Partial || mode == PartialMerge | ||
} && agg.groupingExpressions.nonEmpty // Don't do this for a reduce... | ||
|
||
// for a aggregateExpressions.isEmpty case, we cannot distinguish between final and non-final, | ||
// so don't allow it. | ||
lazy val allowNonFullyAggregatedOutput = aggModes.forall { mode => | ||
mode == Partial || mode == PartialMerge | ||
} && agg.aggregateExpressions.nonEmpty | ||
|
||
lazy val groupingCanBeSorted = agg.groupingExpressions.forall { expr => | ||
orderable.isSupportedByPlugin(expr.dataType) | ||
} | ||
|
@@ -1272,7 +1330,9 @@ abstract class GpuBaseAggregateMeta[INPUT <: SparkPlan]( | |
useTiered, | ||
estimatedPreProcessGrowth, | ||
conf.forceSinglePassPartialSortAgg, | ||
allowSinglePassAgg) | ||
allowSinglePassAgg, | ||
allowNonFullyAggregatedOutput, | ||
conf.skipAggPassReductionRatio) | ||
} | ||
} | ||
|
||
|
@@ -1358,7 +1418,9 @@ abstract class GpuTypedImperativeSupportedAggregateExecMeta[INPUT <: BaseAggrega | |
// For now we are just going to go with the original hash aggregation | ||
1.0, | ||
false, | ||
false) | ||
false, | ||
false, | ||
1) | ||
} else { | ||
super.convertToGpu() | ||
} | ||
|
@@ -1707,6 +1769,10 @@ object GpuHashAggregateExecBase { | |
* @param child incoming plan (where we get input columns from) | ||
* @param configuredTargetBatchSize user-configured maximum device memory size of a batch | ||
* @param configuredTieredProjectEnabled configurable optimization to use tiered projections | ||
* @param allowNonFullyAggregatedOutput whether we can skip the third pass of aggregation | ||
* (can omit non fully aggregated data for non-final | ||
* stage of aggregation) | ||
* @param skipAggPassReductionRatio skip if the ratio of rows after a pass is bigger than this value | ||
*/ | ||
case class GpuHashAggregateExec( | ||
requiredChildDistributionExpressions: Option[Seq[Expression]], | ||
|
@@ -1719,7 +1785,10 @@ case class GpuHashAggregateExec( | |
configuredTieredProjectEnabled: Boolean, | ||
estimatedPreProcessGrowth: Double, | ||
forceSinglePassAgg: Boolean, | ||
allowSinglePassAgg: Boolean) extends ShimUnaryExecNode with GpuExec { | ||
allowSinglePassAgg: Boolean, | ||
allowNonFullyAggregatedOutput: Boolean, | ||
skipAggPassReductionRatio: Double | ||
) extends ShimUnaryExecNode with GpuExec { | ||
|
||
// lifted directly from `BaseAggregateExec.inputAttributes`, edited comment. | ||
def inputAttributes: Seq[Attribute] = | ||
|
@@ -1804,7 +1873,7 @@ case class GpuHashAggregateExec( | |
boundGroupExprs, aggregateExprs, aggregateAttrs, resultExprs, modeInfo, | ||
localEstimatedPreProcessGrowth, alreadySorted, expectedOrdering, | ||
postBoundReferences, targetBatchSize, aggMetrics, useTieredProject, | ||
localForcePre, localAllowPre) | ||
localForcePre, localAllowPre, allowNonFullyAggregatedOutput, skipAggPassReductionRatio) | ||
} | ||
} | ||
|
||
|
@@ -1920,7 +1989,10 @@ class DynamicGpuPartialSortAggregateIterator( | |
metrics: GpuHashAggregateMetrics, | ||
useTiered: Boolean, | ||
forceSinglePassAgg: Boolean, | ||
allowSinglePassAgg: Boolean) extends Iterator[ColumnarBatch] { | ||
allowSinglePassAgg: Boolean, | ||
allowNonFullyAggregatedOutput: Boolean, | ||
skipAggPassReductionRatio: Double | ||
) extends Iterator[ColumnarBatch] { | ||
private var aggIter: Option[Iterator[ColumnarBatch]] = None | ||
private[this] val isReductionOnly = boundGroupExprs.outputTypes.isEmpty | ||
|
||
|
@@ -1998,7 +2070,14 @@ class DynamicGpuPartialSortAggregateIterator( | |
inputAttrs.map(_.dataType).toArray, preProcessAggHelper.preStepBound, | ||
metrics.opTime, metrics.numPreSplits) | ||
|
||
val firstPassIter = GpuAggFirstPassIterator(splitInputIter, preProcessAggHelper, metrics) | ||
val localInputRowsMetrics = new LocalGpuMetric | ||
val firstPassIter = GpuAggFirstPassIterator( | ||
splitInputIter.map(cb => { | ||
localInputRowsMetrics += cb.numRows() | ||
cb | ||
}), | ||
preProcessAggHelper, | ||
metrics) | ||
|
||
val mergeIter = new GpuMergeAggregateIterator( | ||
firstPassIter, | ||
|
@@ -2010,7 +2089,10 @@ class DynamicGpuPartialSortAggregateIterator( | |
modeInfo, | ||
metrics, | ||
configuredTargetBatchSize, | ||
useTiered) | ||
useTiered, | ||
allowNonFullyAggregatedOutput, | ||
skipAggPassReductionRatio, | ||
localInputRowsMetrics) | ||
|
||
GpuAggFinalPassIterator.makeIter(mergeIter, postBoundReferences, metrics) | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1509,6 +1509,14 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern") | |
.booleanConf | ||
.createWithDefault(true) | ||
|
||
val SKIP_AGG_PASS_REDUCTION_RATIO = conf("spark.rapids.sql.agg.skipAggPassReductionRatio") | ||
.doc("In non-final aggregation stages, if the previous pass has a row reduction ratio " + | ||
"greater than this value, the next aggregation pass will be skipped." + | ||
"Setting this to 1 essentially disables this feature.") | ||
.doubleConf | ||
.checkValue(v => v >= 0 && v <= 1, "The ratio value must be in [0, 1].") | ||
.createWithDefault(0.9) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How did you reach 90% as the cutoff? Is it just arbitrary or did you do some tuning/testing to figure this out. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it is arbitrary. I chose 0.9 in fear of incurring too much pressure to shuffle |
||
|
||
val FORCE_SINGLE_PASS_PARTIAL_SORT_AGG: ConfEntryWithDefault[Boolean] = | ||
conf("spark.rapids.sql.agg.forceSinglePassPartialSort") | ||
.doc("Force a single pass partial sort agg to happen in all cases that it could, " + | ||
|
@@ -3043,6 +3051,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging { | |
|
||
lazy val forceSinglePassPartialSortAgg: Boolean = get(FORCE_SINGLE_PASS_PARTIAL_SORT_AGG) | ||
|
||
lazy val skipAggPassReductionRatio: Double = get(SKIP_AGG_PASS_REDUCTION_RATIO) | ||
|
||
lazy val isRegExpEnabled: Boolean = get(ENABLE_REGEXP) | ||
|
||
lazy val maxRegExpStateMemory: Long = { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a workaround for #8652
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is okay as a short term work around, but I am not going to approve a PR with this in it.