Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds pre/post steps for merge and update aggregate #3417

Merged
merged 16 commits into from
Sep 14, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 94 additions & 69 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.annotation.tailrec
import scala.collection.mutable

import ai.rapids.cudf
import ai.rapids.cudf.{DType, NvtxColor, Scalar}
import ai.rapids.cudf.{GroupByAggregationOnColumn, NvtxColor, Scalar}
import com.nvidia.spark.rapids.GpuMetric._
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.shims.v2.ShimUnaryExecNode
Expand All @@ -31,7 +31,7 @@ import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute, AttributeReference, AttributeSeq, AttributeSet, Expression, ExprId, If, NamedExpression, NullsFirst, Projection, UnsafeProjection}
import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute, AttributeReference, AttributeSeq, AttributeSet, Expression, ExprId, If, NamedExpression, NullsFirst}
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
Expand All @@ -42,8 +42,8 @@ import org.apache.spark.sql.execution.{ExplainUtils, SortExec, SparkPlan}
import org.apache.spark.sql.execution.aggregate.{BaseAggregateExec, HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec}
import org.apache.spark.sql.rapids.{CpuToGpuAggregateBufferConverter, CudfAggregate, GpuAggregateExpression, GpuToCpuAggregateBufferConverter}
import org.apache.spark.sql.rapids.execution.{GpuShuffleMeta, TrampolineUtil}
import org.apache.spark.sql.types.{ArrayType, DataType, LongType, MapType, StructType}
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
import org.apache.spark.sql.types.{ArrayType, DataType, LongType, MapType}
import org.apache.spark.sql.vectorized.ColumnarBatch

object AggregateUtils {

Expand Down Expand Up @@ -124,10 +124,13 @@ object AggregateUtils {
* Compute the aggregation modes and aggregate expressions for all aggregation expressions
* @param aggExpressions the aggregate expressions
* @param aggBufferAttributes attributes to be bound to the aggregate expressions
revans2 marked this conversation as resolved.
Show resolved Hide resolved
* @param mergeBufferAttributes merge attributes to be bound to the merge expressions
*/
def computeAggModeCudfAggregates(
aggExpressions: Seq[GpuAggregateExpression],
aggBufferAttributes: Seq[Attribute]): Seq[(AggregateMode, Seq[CudfAggregate])] = {
aggBufferAttributes: Seq[Attribute],
mergeBufferAttributes: Seq[Attribute]): Seq[(
GpuAggregateExpression, AggregateMode, Seq[CudfAggregate])] = {
//
// update expressions are those performed on the raw input data
// e.g. for count it's count, and for average it's sum and count.
Expand All @@ -145,10 +148,10 @@ object AggregateUtils {
GpuBindReferences.bindGpuReferences(updateExpressionsSeq(modeIndex), aggBufferAttributes)
.asInstanceOf[Seq[CudfAggregate]]
} else {
GpuBindReferences.bindGpuReferences(mergeExpressionsSeq(modeIndex), aggBufferAttributes)
GpuBindReferences.bindGpuReferences(mergeExpressionsSeq(modeIndex), mergeBufferAttributes)
.asInstanceOf[Seq[CudfAggregate]]
}
(expr.mode, cudfAggregates)
(expr, expr.mode, cudfAggregates)
}
}
}
Expand Down Expand Up @@ -241,7 +244,7 @@ class GpuHashAggregateIterator(
boundInputReferences: Seq[GpuExpression],
boundFinalProjections: Option[Seq[GpuExpression]],
boundResultReferences: Seq[Expression],
aggModeCudfAggregates: Seq[(AggregateMode, Seq[CudfAggregate])])
aggModeCudfAggregates: Seq[(GpuAggregateExpression, AggregateMode, Seq[CudfAggregate])])
revans2 marked this conversation as resolved.
Show resolved Hide resolved

Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => close()))

Expand Down Expand Up @@ -305,20 +308,20 @@ class GpuHashAggregateIterator(
}

private def computeTargetMergeBatchSize(confTargetSize: Long): Long = {
val aggregates = boundExpressions.aggModeCudfAggregates.flatMap(_._2)
val aggregates = boundExpressions.aggModeCudfAggregates.flatMap(_._3)
val mergedTypes = groupingExpressions.map(_.dataType) ++ aggregates.map(_.dataType)
AggregateUtils.computeTargetBatchSize(confTargetSize, mergedTypes, mergedTypes,isReductionOnly)
}

/** Aggregate all input batches and place the results in the aggregatedBatches queue. */
private def aggregateInputBatches(): Unit = {
while (cbIter.hasNext) {
val (childCvs, isLastInputBatch) = withResource(cbIter.next()) { inputBatch =>
val (childBatch, isLastInputBatch) = withResource(cbIter.next()) { inputBatch =>
val isLast = GpuColumnVector.isTaggedAsFinalBatch(inputBatch)
(processIncomingBatch(inputBatch), isLast)
}
withResource(childCvs) { _ =>
withResource(computeAggregate(childCvs, merge = false)) { aggBatch =>
withResource(childBatch) { _ =>
withResource(computeAggregate(childBatch, merge = false)) { aggBatch =>
val batch = LazySpillableColumnarBatch(aggBatch, metrics.spillCallback, "aggbatch")
// Avoid making batch spillable for the common case of the last and only batch
if (!(isLastInputBatch && aggregatedBatches.isEmpty)) {
Expand Down Expand Up @@ -427,7 +430,10 @@ class GpuHashAggregateIterator(
batches: mutable.ArrayBuffer[LazySpillableColumnarBatch]): LazySpillableColumnarBatch = {
withResource(batches) { _ =>
withResource(concatenateBatches(batches)) { concatVectors =>
withResource(computeAggregate(concatVectors, merge = true)) { mergedBatch =>
val concatBatch = new ColumnarBatch(
concatVectors.toArray,
concatVectors.head.getRowCount.toInt)
revans2 marked this conversation as resolved.
Show resolved Hide resolved
withResource(computeAggregate(concatBatch, merge = true)) { mergedBatch =>
LazySpillableColumnarBatch(mergedBatch, metrics.spillCallback, "agg merged batch")
}
}
Expand Down Expand Up @@ -461,7 +467,7 @@ class GpuHashAggregateIterator(
val aggBufferAttributes = groupingAttributes ++
aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)
val sorter = new GpuSorter(ordering, aggBufferAttributes)
val aggregates = boundExpressions.aggModeCudfAggregates.flatMap(_._2)
val aggregates = boundExpressions.aggModeCudfAggregates.flatMap(_._3)
val aggBatchTypes = groupingExpressions.map(_.dataType) ++ aggregates.map(_.dataType)

// Use the out of core sort iterator to sort the batches by grouping key
Expand Down Expand Up @@ -503,7 +509,7 @@ class GpuHashAggregateIterator(
override def next(): ColumnarBatch = {
// batches coming out of the sort need to be merged
withResource(keyBatchingIter.next()) { batch =>
computeAggregate(GpuColumnVector.extractColumns(batch), merge = true, isSorted = true)
computeAggregate(batch, merge = true, isSorted = true)
}
}
}
Expand Down Expand Up @@ -567,10 +573,10 @@ class GpuHashAggregateIterator(
}

/** Perform the initial projection on the input batch and extract the result columns */
private def processIncomingBatch(batch: ColumnarBatch): Seq[GpuColumnVector] = {
private def processIncomingBatch(batch: ColumnarBatch): ColumnarBatch = {
val aggTime = metrics.computeAggTime
withResource(new NvtxWithMetrics("prep agg batch", NvtxColor.CYAN, aggTime)) { _ =>
boundExpressions.boundInputReferences.safeMap { ref =>
val cols = boundExpressions.boundInputReferences.safeMap { ref =>
val childCv = GpuExpressionsUtils.columnarEvalToColumn(ref, batch)
if (childCv.dataType == ref.dataType) {
childCv
revans2 marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -581,6 +587,7 @@ class GpuHashAggregateIterator(
}
}
}
new ColumnarBatch(cols.toArray, cols.head.getRowCount.toInt)
}
}

Expand Down Expand Up @@ -623,9 +630,12 @@ class GpuHashAggregateIterator(
val groupingAttributes = groupingExpressions.map(_.asInstanceOf[NamedExpression].toAttribute)
val aggBufferAttributes = groupingAttributes ++
aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)
val mergeBufferAttributes = groupingAttributes ++
aggregateExpressions.flatMap(_.aggregateFunction.mergeBufferAttributes)

val aggModeCudfAggregates =
AggregateUtils.computeAggModeCudfAggregates(aggregateExpressions, aggBufferAttributes)
AggregateUtils.computeAggModeCudfAggregates(
aggregateExpressions, aggBufferAttributes, mergeBufferAttributes)

// boundInputReferences is used to pick out of the input batch the appropriate columns
// for aggregation.
Expand Down Expand Up @@ -757,15 +767,20 @@ class GpuHashAggregateIterator(

/**
* Compute the aggregations on the projected input columns.
* @param toAggregateCvs column vectors representing the input batch to aggregate
* @param toAggregateBatch input batch to aggregate
* @param merge true indicates a merge aggregation should be performed
* @param isSorted true indicates the data is already sorted by the grouping keys
* @return aggregated batch
*/
private def computeAggregate(
toAggregateCvs: Seq[GpuColumnVector],
toAggregateBatch: ColumnarBatch,
merge: Boolean,
isSorted: Boolean = false): ColumnarBatch = {
val groupingAttributes = groupingExpressions.map(_.asInstanceOf[NamedExpression].toAttribute)
revans2 marked this conversation as resolved.
Show resolved Hide resolved

val aggBufferAttributes = groupingAttributes ++
aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)

val aggModeCudfAggregates = boundExpressions.aggModeCudfAggregates
val computeAggTime = metrics.computeAggTime
withResource(new NvtxWithMetrics("computeAggregate", NvtxColor.CYAN, computeAggTime)) { _ =>
Expand All @@ -777,73 +792,83 @@ class GpuHashAggregateIterator(
// For example: GpuAverage has an update version of: (CudfSum, CudfCount)
// and CudfCount has an update version of AggregateOp.COUNT and a
// merge version of AggregateOp.COUNT.
val aggregates = aggModeCudfAggregates.flatMap(_._2)
val cudfAggregates = aggModeCudfAggregates.flatMap { case (mode, aggregates) =>
if ((mode == Partial || mode == Complete) && !merge) {
aggregates.map(a => a.updateAggregate.onColumn(a.getOrdinal(a.ref)))
var dataTypes = new mutable.ArrayBuffer[DataType]()
val cudfAggregates = new mutable.ArrayBuffer[GroupByAggregationOnColumn]()

// `GpuAggregateFunction` can add a pre and post step for update
// and merge aggregates.
val preStep = new mutable.ArrayBuffer[Expression]()
val postStep = new mutable.ArrayBuffer[Expression]()
val postStepAttr = new mutable.ArrayBuffer[Attribute]()

// we add the grouping expression first, which bind as pass-through
preStep ++= GpuBindReferences.bindGpuReferences(
groupingAttributes, groupingAttributes)
postStep ++= GpuBindReferences.bindGpuReferences(
groupingAttributes, groupingAttributes)
postStepAttr ++= groupingAttributes
dataTypes ++=
groupingExpressions.map(_.dataType)

for ((aggExp, mode, aggregates) <- aggModeCudfAggregates) {
val aggFn = aggExp.aggregateFunction
if ((mode == Partial || mode == Complete) && ! merge) {
preStep ++= aggFn.preUpdate
postStep ++= aggFn.postUpdate
postStepAttr ++= aggFn.postUpdateAttr
} else {
aggregates.map(a => a.mergeAggregate.onColumn(a.getOrdinal(a.ref)))
preStep ++= aggFn.preMerge
postStep ++= aggFn.postMerge
postStepAttr ++= aggFn.postMergeAttr
}

aggregates.map { a =>
if ((mode == Partial || mode == Complete) && !merge) {
cudfAggregates += a.updateAggregate
dataTypes += a.updateDataType
} else {
cudfAggregates += a.mergeAggregate
dataTypes += a.dataType
}
}
}
val groupOptions = cudf.GroupByOptions.builder()
.withIgnoreNullKeys(false)
.withKeysSorted(isSorted)
.build()
val result = withResource(new cudf.Table(toAggregateCvs.map(_.getBase): _*)) { tbl =>
tbl.groupBy(groupOptions, groupingExpressions.indices: _*).aggregate(cudfAggregates: _*)
}
withResource(result) { result =>
// Turn aggregation into a ColumnarBatch for the result evaluation
// Note that the resulting ColumnarBatch has the following shape:
//
// [key1, key2, ..., keyN, cudfAgg1, cudfAgg2, ..., cudfAggN]
//
// where cudfAgg_i can be multiple columns foreach Spark aggregate
// (i.e. partial_gpuavg => cudf sum and cudf count)
//
// The type of the columns returned by aggregate depends on cudf. A count of a long column
// may return a 32bit column, which is bad if you are trying to concatenate batches
// later. Cast here to the type that the aggregate expects (e.g. Long in case of count)
val dataTypes = groupingExpressions.map(_.dataType) ++ aggregates.map(_.dataType)

val resCols = mutable.ArrayBuffer.empty[ColumnVector]
closeOnExcept(resCols) { resCols =>
(0 until result.getNumberOfColumns).foldLeft(resCols) { case (ret, i) =>
val column = result.getColumn(i)
val rapidsType = dataTypes(i) match {
case dt if GpuColumnVector.isNonNestedSupportedType(dt) =>
GpuColumnVector.getNonNestedRapidsType(dataTypes(i))
case dt: ArrayType if GpuColumnVector.typeConversionAllowed(column, dt) =>
DType.LIST
case dt: MapType if GpuColumnVector.typeConversionAllowed(column, dt) =>
DType.LIST
case dt: StructType if GpuColumnVector.typeConversionAllowed(column, dt) =>
DType.STRUCT
case dt =>
throw new IllegalArgumentException(s"Can NOT convert $column to data type $dt.")
}
// cast will be cheap if type matches, only does refCount++ in that case
withResource(column.castTo(rapidsType)) { castedCol =>
ret += GpuColumnVector.from(castedCol.incRefCount(), dataTypes(i))

// a pre-processing step required before we go into the cuDF aggregate, in some cases
// casting and in others creating a struct (MERGE_M2 for instance, requires a struct)
val preStepBound = GpuBindReferences.bindGpuReferences(preStep, aggBufferAttributes)
withResource(GpuProjectExec.project(toAggregateBatch, preStepBound)) { preProcessed =>
withResource(GpuColumnVector.from(preProcessed)) { preProcessedTbl =>
val groupOptions = cudf.GroupByOptions.builder()
.withIgnoreNullKeys(false)
.withKeysSorted(isSorted)
.build()

// perform the aggregate
withResource(preProcessedTbl
.groupBy(groupOptions, groupingExpressions.indices: _*)
.aggregate(cudfAggregates: _*)) { result =>
withResource(GpuColumnVector.from(result, dataTypes.toArray)) { resultBatch =>
// a post-processing step required in some scenarios, casting or picking
// apart a struct
val postStepBound = GpuBindReferences.bindGpuReferences(postStep, postStepAttr)
GpuProjectExec.project(resultBatch, postStepBound)
}
ret
}
new ColumnarBatch(resCols.toArray, result.getRowCount.toInt)
}
}
} else {
// Reduction aggregate
// we ask the appropriate merge or update CudfAggregates, what their
// reduction merge or update aggregates functions are
val cvs = mutable.ArrayBuffer[GpuColumnVector]()
aggModeCudfAggregates.foreach { case (mode, aggs) =>
aggModeCudfAggregates.foreach { case (_, mode, aggs) =>
aggs.foreach { agg =>
val aggFn = if ((mode == Partial || mode == Complete) && !merge) {
agg.updateReductionAggregate
} else {
agg.mergeReductionAggregate
}
withResource(aggFn(toAggregateCvs(agg.getOrdinal(agg.ref)).getBase)) { res =>
withResource(aggFn(GpuColumnVector.extractColumns(toAggregateBatch))) { res =>
val rapidsType = GpuColumnVector.getNonNestedRapidsType(agg.dataType)
withResource(cudf.ColumnVector.fromScalar(res, 1)) { cv =>
cvs += GpuColumnVector.from(cv.castTo(rapidsType), agg.dataType)
Expand Down
Loading