From e33ffa44fd62df5fa4b2ab6a600390b49c96e3c7 Mon Sep 17 00:00:00 2001 From: Alessandro Bellina Date: Tue, 24 Aug 2021 13:54:46 -0600 Subject: [PATCH 01/13] Refactor aggregate functions to add pre/post update and merge Signed-off-by: Alessandro Bellina --- .../com/nvidia/spark/rapids/aggregate.scala | 163 +++++++++------- .../spark/sql/rapids/AggregateFunctions.scala | 180 +++++++++++++----- 2 files changed, 224 insertions(+), 119 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala index a696322b43b..804b81a2c7f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala @@ -22,7 +22,7 @@ import scala.annotation.tailrec import scala.collection.mutable import ai.rapids.cudf -import ai.rapids.cudf.{DType, NvtxColor, Scalar} +import ai.rapids.cudf.{GroupByAggregationOnColumn, NvtxColor, Scalar} import com.nvidia.spark.rapids.GpuMetric._ import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.shims.v2.ShimUnaryExecNode @@ -31,7 +31,7 @@ import org.apache.spark.TaskContext import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute, AttributeReference, AttributeSeq, AttributeSet, Expression, ExprId, If, NamedExpression, NullsFirst, Projection, UnsafeProjection} +import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute, AttributeReference, AttributeSeq, AttributeSet, Expression, ExprId, If, NamedExpression, NullsFirst} import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -42,8 +42,8 @@ import org.apache.spark.sql.execution.{ExplainUtils, SortExec, SparkPlan} import org.apache.spark.sql.execution.aggregate.{BaseAggregateExec, HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} import org.apache.spark.sql.rapids.{CpuToGpuAggregateBufferConverter, CudfAggregate, GpuAggregateExpression, GpuToCpuAggregateBufferConverter} import org.apache.spark.sql.rapids.execution.{GpuShuffleMeta, TrampolineUtil} -import org.apache.spark.sql.types.{ArrayType, DataType, LongType, MapType, StructType} -import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} +import org.apache.spark.sql.types.{ArrayType, DataType, LongType, MapType} +import org.apache.spark.sql.vectorized.ColumnarBatch object AggregateUtils { @@ -127,7 +127,9 @@ object AggregateUtils { */ def computeAggModeCudfAggregates( aggExpressions: Seq[GpuAggregateExpression], - aggBufferAttributes: Seq[Attribute]): Seq[(AggregateMode, Seq[CudfAggregate])] = { + aggBufferAttributes: Seq[Attribute], + mergeBufferAttributes: Seq[Attribute]): Seq[( + GpuAggregateExpression, AggregateMode, Seq[CudfAggregate])] = { // // update expressions are those performed on the raw input data // e.g. for count it's count, and for average it's sum and count. @@ -145,10 +147,10 @@ object AggregateUtils { GpuBindReferences.bindGpuReferences(updateExpressionsSeq(modeIndex), aggBufferAttributes) .asInstanceOf[Seq[CudfAggregate]] } else { - GpuBindReferences.bindGpuReferences(mergeExpressionsSeq(modeIndex), aggBufferAttributes) + GpuBindReferences.bindGpuReferences(mergeExpressionsSeq(modeIndex), mergeBufferAttributes) .asInstanceOf[Seq[CudfAggregate]] } - (expr.mode, cudfAggregates) + (expr, expr.mode, cudfAggregates) } } } @@ -241,7 +243,7 @@ class GpuHashAggregateIterator( boundInputReferences: Seq[GpuExpression], boundFinalProjections: Option[Seq[GpuExpression]], boundResultReferences: Seq[Expression], - aggModeCudfAggregates: Seq[(AggregateMode, Seq[CudfAggregate])]) + aggModeCudfAggregates: Seq[(GpuAggregateExpression, AggregateMode, Seq[CudfAggregate])]) Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => close())) @@ -305,7 +307,7 @@ class GpuHashAggregateIterator( } private def computeTargetMergeBatchSize(confTargetSize: Long): Long = { - val aggregates = boundExpressions.aggModeCudfAggregates.flatMap(_._2) + val aggregates = boundExpressions.aggModeCudfAggregates.flatMap(_._3) val mergedTypes = groupingExpressions.map(_.dataType) ++ aggregates.map(_.dataType) AggregateUtils.computeTargetBatchSize(confTargetSize, mergedTypes, mergedTypes,isReductionOnly) } @@ -313,12 +315,12 @@ class GpuHashAggregateIterator( /** Aggregate all input batches and place the results in the aggregatedBatches queue. */ private def aggregateInputBatches(): Unit = { while (cbIter.hasNext) { - val (childCvs, isLastInputBatch) = withResource(cbIter.next()) { inputBatch => + val (childBatch, isLastInputBatch) = withResource(cbIter.next()) { inputBatch => val isLast = GpuColumnVector.isTaggedAsFinalBatch(inputBatch) (processIncomingBatch(inputBatch), isLast) } - withResource(childCvs) { _ => - withResource(computeAggregate(childCvs, merge = false)) { aggBatch => + withResource(childBatch) { _ => + withResource(computeAggregate(childBatch, merge = false)) { aggBatch => val batch = LazySpillableColumnarBatch(aggBatch, metrics.spillCallback, "aggbatch") // Avoid making batch spillable for the common case of the last and only batch if (!(isLastInputBatch && aggregatedBatches.isEmpty)) { @@ -427,7 +429,10 @@ class GpuHashAggregateIterator( batches: mutable.ArrayBuffer[LazySpillableColumnarBatch]): LazySpillableColumnarBatch = { withResource(batches) { _ => withResource(concatenateBatches(batches)) { concatVectors => - withResource(computeAggregate(concatVectors, merge = true)) { mergedBatch => + val concatBatch = new ColumnarBatch( + concatVectors.toArray, + concatVectors.head.getRowCount.toInt) + withResource(computeAggregate(concatBatch, merge = true)) { mergedBatch => LazySpillableColumnarBatch(mergedBatch, metrics.spillCallback, "agg merged batch") } } @@ -461,7 +466,7 @@ class GpuHashAggregateIterator( val aggBufferAttributes = groupingAttributes ++ aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes) val sorter = new GpuSorter(ordering, aggBufferAttributes) - val aggregates = boundExpressions.aggModeCudfAggregates.flatMap(_._2) + val aggregates = boundExpressions.aggModeCudfAggregates.flatMap(_._3) val aggBatchTypes = groupingExpressions.map(_.dataType) ++ aggregates.map(_.dataType) // Use the out of core sort iterator to sort the batches by grouping key @@ -503,7 +508,7 @@ class GpuHashAggregateIterator( override def next(): ColumnarBatch = { // batches coming out of the sort need to be merged withResource(keyBatchingIter.next()) { batch => - computeAggregate(GpuColumnVector.extractColumns(batch), merge = true, isSorted = true) + computeAggregate(batch, merge = true, isSorted = true) } } } @@ -567,10 +572,10 @@ class GpuHashAggregateIterator( } /** Perform the initial projection on the input batch and extract the result columns */ - private def processIncomingBatch(batch: ColumnarBatch): Seq[GpuColumnVector] = { + private def processIncomingBatch(batch: ColumnarBatch): ColumnarBatch = { val aggTime = metrics.computeAggTime withResource(new NvtxWithMetrics("prep agg batch", NvtxColor.CYAN, aggTime)) { _ => - boundExpressions.boundInputReferences.safeMap { ref => + val cols = boundExpressions.boundInputReferences.safeMap { ref => val childCv = GpuExpressionsUtils.columnarEvalToColumn(ref, batch) if (childCv.dataType == ref.dataType) { childCv @@ -581,6 +586,7 @@ class GpuHashAggregateIterator( } } } + new ColumnarBatch(cols.toArray, cols.head.getRowCount.toInt) } } @@ -623,9 +629,12 @@ class GpuHashAggregateIterator( val groupingAttributes = groupingExpressions.map(_.asInstanceOf[NamedExpression].toAttribute) val aggBufferAttributes = groupingAttributes ++ aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes) + val mergeBufferAttributes = groupingAttributes ++ + aggregateExpressions.flatMap(_.aggregateFunction.mergeBufferAttributes) val aggModeCudfAggregates = - AggregateUtils.computeAggModeCudfAggregates(aggregateExpressions, aggBufferAttributes) + AggregateUtils.computeAggModeCudfAggregates( + aggregateExpressions, aggBufferAttributes, mergeBufferAttributes) // boundInputReferences is used to pick out of the input batch the appropriate columns // for aggregation. @@ -757,15 +766,20 @@ class GpuHashAggregateIterator( /** * Compute the aggregations on the projected input columns. - * @param toAggregateCvs column vectors representing the input batch to aggregate + * @param toAggregateBatch input batch to aggregate * @param merge true indicates a merge aggregation should be performed * @param isSorted true indicates the data is already sorted by the grouping keys * @return aggregated batch */ private def computeAggregate( - toAggregateCvs: Seq[GpuColumnVector], + toAggregateBatch: ColumnarBatch, merge: Boolean, isSorted: Boolean = false): ColumnarBatch = { + val groupingAttributes = groupingExpressions.map(_.asInstanceOf[NamedExpression].toAttribute) + + val aggBufferAttributes = groupingAttributes ++ + aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes) + val aggModeCudfAggregates = boundExpressions.aggModeCudfAggregates val computeAggTime = metrics.computeAggTime withResource(new NvtxWithMetrics("computeAggregate", NvtxColor.CYAN, computeAggTime)) { _ => @@ -777,58 +791,69 @@ class GpuHashAggregateIterator( // For example: GpuAverage has an update version of: (CudfSum, CudfCount) // and CudfCount has an update version of AggregateOp.COUNT and a // merge version of AggregateOp.COUNT. - val aggregates = aggModeCudfAggregates.flatMap(_._2) - val cudfAggregates = aggModeCudfAggregates.flatMap { case (mode, aggregates) => - if ((mode == Partial || mode == Complete) && !merge) { - aggregates.map(a => a.updateAggregate.onColumn(a.getOrdinal(a.ref))) + var dataTypes = new mutable.ArrayBuffer[DataType]() + val cudfAggregates = new mutable.ArrayBuffer[GroupByAggregationOnColumn]() + + // `GpuAggregateFunction` can add a pre and post step for update + // and merge aggregates. + val preStep = new mutable.ArrayBuffer[Expression]() + val postStep = new mutable.ArrayBuffer[Expression]() + val postStepAttr = new mutable.ArrayBuffer[Attribute]() + + // we add the grouping expression first, which bind as pass-through + preStep ++= GpuBindReferences.bindGpuReferences( + groupingAttributes, groupingAttributes) + postStep ++= GpuBindReferences.bindGpuReferences( + groupingAttributes, groupingAttributes) + postStepAttr ++= groupingAttributes + dataTypes ++= + groupingExpressions.map(_.dataType) + + for ((aggExp, mode, aggregates) <- aggModeCudfAggregates) { + // bind pre-merge to the aggBufferAttributes (input) + val aggFn = aggExp.aggregateFunction + if ((mode == Partial || mode == Complete) && ! merge) { + preStep ++= aggFn.preUpdate + postStep ++= aggFn.postUpdate + postStepAttr ++= aggFn.postUpdateAttr } else { - aggregates.map(a => a.mergeAggregate.onColumn(a.getOrdinal(a.ref))) + preStep ++= aggFn.preMerge + postStep ++= aggFn.postMerge + postStepAttr ++= aggFn.postMergeAttr + } + + aggregates.map { a => + if ((mode == Partial || mode == Complete) && !merge) { + cudfAggregates += a.updateAggregate + dataTypes += a.updateDataType + } else { + cudfAggregates += a.mergeAggregate + dataTypes += a.dataType + } } } - val groupOptions = cudf.GroupByOptions.builder() - .withIgnoreNullKeys(false) - .withKeysSorted(isSorted) - .build() - val result = withResource(new cudf.Table(toAggregateCvs.map(_.getBase): _*)) { tbl => - tbl.groupBy(groupOptions, groupingExpressions.indices: _*).aggregate(cudfAggregates: _*) - } - withResource(result) { result => - // Turn aggregation into a ColumnarBatch for the result evaluation - // Note that the resulting ColumnarBatch has the following shape: - // - // [key1, key2, ..., keyN, cudfAgg1, cudfAgg2, ..., cudfAggN] - // - // where cudfAgg_i can be multiple columns foreach Spark aggregate - // (i.e. partial_gpuavg => cudf sum and cudf count) - // - // The type of the columns returned by aggregate depends on cudf. A count of a long column - // may return a 32bit column, which is bad if you are trying to concatenate batches - // later. Cast here to the type that the aggregate expects (e.g. Long in case of count) - val dataTypes = groupingExpressions.map(_.dataType) ++ aggregates.map(_.dataType) - - val resCols = mutable.ArrayBuffer.empty[ColumnVector] - closeOnExcept(resCols) { resCols => - (0 until result.getNumberOfColumns).foldLeft(resCols) { case (ret, i) => - val column = result.getColumn(i) - val rapidsType = dataTypes(i) match { - case dt if GpuColumnVector.isNonNestedSupportedType(dt) => - GpuColumnVector.getNonNestedRapidsType(dataTypes(i)) - case dt: ArrayType if GpuColumnVector.typeConversionAllowed(column, dt) => - DType.LIST - case dt: MapType if GpuColumnVector.typeConversionAllowed(column, dt) => - DType.LIST - case dt: StructType if GpuColumnVector.typeConversionAllowed(column, dt) => - DType.STRUCT - case dt => - throw new IllegalArgumentException(s"Can NOT convert $column to data type $dt.") - } - // cast will be cheap if type matches, only does refCount++ in that case - withResource(column.castTo(rapidsType)) { castedCol => - ret += GpuColumnVector.from(castedCol.incRefCount(), dataTypes(i)) + + // a pre-processing step required before we go into the cuDF aggregate, in some cases + // casting and in others creating a struct (MERGE_M2 for instance, requires a struct) + val preStepBound = GpuBindReferences.bindGpuReferences(preStep, aggBufferAttributes) + withResource(GpuProjectExec.project(toAggregateBatch, preStepBound)) { preProcessed => + withResource(GpuColumnVector.from(preProcessed)) { preProcessedTbl => + val groupOptions = cudf.GroupByOptions.builder() + .withIgnoreNullKeys(false) + .withKeysSorted(isSorted) + .build() + + // perform the aggregate + withResource(preProcessedTbl + .groupBy(groupOptions, groupingExpressions.indices: _*) + .aggregate(cudfAggregates: _*)) { result => + withResource(GpuColumnVector.from(result, dataTypes.toArray)) { resultBatch => + // a post-processing step required in some scenarios, casting or picking + // apart a struct + val postStepBound = GpuBindReferences.bindGpuReferences(postStep, postStepAttr) + GpuProjectExec.project(resultBatch, postStepBound) } - ret } - new ColumnarBatch(resCols.toArray, result.getRowCount.toInt) } } } else { @@ -836,14 +861,14 @@ class GpuHashAggregateIterator( // we ask the appropriate merge or update CudfAggregates, what their // reduction merge or update aggregates functions are val cvs = mutable.ArrayBuffer[GpuColumnVector]() - aggModeCudfAggregates.foreach { case (mode, aggs) => + aggModeCudfAggregates.foreach { case (_, mode, aggs) => aggs.foreach { agg => val aggFn = if ((mode == Partial || mode == Complete) && !merge) { agg.updateReductionAggregate } else { agg.mergeReductionAggregate } - withResource(aggFn(toAggregateCvs(agg.getOrdinal(agg.ref)).getBase)) { res => + withResource(aggFn(GpuColumnVector.extractColumns(toAggregateBatch))) { res => val rapidsType = GpuColumnVector.getNonNestedRapidsType(agg.dataType) withResource(cudf.ColumnVector.fromScalar(res, 1)) { cv => cvs += GpuColumnVector.from(cv.castTo(rapidsType), agg.dataType) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala index 9304e8e3b23..dccce0b30f4 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala @@ -16,22 +16,19 @@ package org.apache.spark.sql.rapids -import java.io.{ByteArrayInputStream, ObjectInputStream} - import ai.rapids.cudf -import ai.rapids.cudf.{BinaryOp, ColumnVector, DType, GroupByAggregation, GroupByScanAggregation, NullPolicy, ReductionAggregation, ReplacePolicy, RollingAggregation, RollingAggregationOnColumn, ScanAggregation} +import ai.rapids.cudf.{BinaryOp, ColumnVector, DType, GroupByAggregation, GroupByAggregationOnColumn, GroupByScanAggregation, NullPolicy, ReductionAggregation, ReplacePolicy, RollingAggregation, RollingAggregationOnColumn, ScanAggregation} import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.shims.v2._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckSuccess -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Expression, ExprId, ImplicitCastInputTypes, UnaryExpression, UnsafeArrayData, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Expression, ExprId, ImplicitCastInputTypes, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback -import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, TypeUtils} +import org.apache.spark.sql.catalyst.util.{ArrayData, TypeUtils} import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.Platform trait GpuAggregateFunction extends GpuExpression with ShimExpression @@ -49,6 +46,9 @@ trait GpuAggregateFunction extends GpuExpression /** Attributes of fields in aggBufferSchema. */ def aggBufferAttributes: Seq[AttributeReference] + /** This is the shape of merge aggregates, to which the postMerge binds to */ + def mergeBufferAttributes: Seq[AttributeReference] = aggBufferAttributes + /** * Result of the aggregate function when the input is empty. This is currently only used for the * proper rewriting of distinct aggregate functions. @@ -75,9 +75,24 @@ trait GpuAggregateFunction extends GpuExpression // update: first half of the aggregation (count = count) val updateExpressions: Seq[Expression] + // expression to use to modify pre and post a cuDF update aggregate + // preUpdate: modify an incoming batch to match the shape/type cuDF expects + // postUpdate and postUpdateAttr: take the output of a cuDF update aggregate and return + // what spark expects + lazy val preUpdate: Seq[Expression] = aggBufferAttributes + lazy val postUpdate: Seq[Expression] = aggBufferAttributes + lazy val postUpdateAttr: Seq[AttributeReference] = aggBufferAttributes + // merge: second half of the aggregation (count = sum). Also use to merge multiple batches. val mergeExpressions: Seq[GpuExpression] + // expression to use to modify pre and post a cudf merge aggregate + // preMerge: modify a partial batch to match the input required by a merge aggregate + // postMerge and postMergeAttr: used to put the result of the merge aggregate, in Spark terms. + lazy val preMerge: Seq[Expression] = aggBufferAttributes + lazy val postMerge: Seq[Expression] = aggBufferAttributes + lazy val postMergeAttr: Seq[AttributeReference] = aggBufferAttributes + // mostly likely a pass through (count => sum we merged above). // average has a more interesting expression to compute the division of sum/count val evaluateExpression: Expression @@ -125,6 +140,14 @@ case class WrappedAggFunction(aggregateFunction: GpuAggregateFunction, filter: E aggregateFunction.mergeExpressions override val evaluateExpression: Expression = aggregateFunction.evaluateExpression + + override lazy val preUpdate: Seq[Expression] = aggregateFunction.preUpdate + override lazy val postUpdate: Seq[Expression] = aggregateFunction.postUpdate + override lazy val postUpdateAttr: Seq[AttributeReference] = aggregateFunction.postUpdateAttr + + override lazy val preMerge: Seq[Expression] = aggregateFunction.preMerge + override lazy val postMerge: Seq[Expression] = aggregateFunction.postMerge + override lazy val postMergeAttr: Seq[AttributeReference] = aggregateFunction.postMergeAttr } case class GpuAggregateExpression(origAggregateFunction: GpuAggregateFunction, @@ -192,25 +215,43 @@ case class GpuAggregateExpression(origAggregateFunction: GpuAggregateFunction, abstract case class CudfAggregate(ref: Expression) extends GpuUnevaluable with ShimExpression { // we use this to get the ordinal of the bound reference, s.t. we can ask cudf to perform // the aggregate on that column - def getOrdinal(ref: Expression): Int = ref.asInstanceOf[GpuBoundReference].ordinal - val updateReductionAggregate: cudf.ColumnVector => cudf.Scalar - val mergeReductionAggregate: cudf.ColumnVector => cudf.Scalar - val updateAggregate: GroupByAggregation - val mergeAggregate: GroupByAggregation + protected def getOrdinal(ref: Expression): Int = + ref.asInstanceOf[GpuBoundReference].ordinal + lazy val updateReductionAggregate: Seq[GpuColumnVector] => cudf.Scalar = + (cvs: Seq[GpuColumnVector]) => { + updateReductionAggregateInternal(cvs(getOrdinal(ref)).getBase) + } + + lazy val mergeReductionAggregate: Seq[GpuColumnVector]=> cudf.Scalar = + (cvs: Seq[GpuColumnVector]) => { + mergeReductionAggregateInternal(cvs(getOrdinal(ref)).getBase) + } + + val updateReductionAggregateInternal: cudf.ColumnVector => cudf.Scalar + val mergeReductionAggregateInternal: cudf.ColumnVector => cudf.Scalar + val updateAggregate: GroupByAggregationOnColumn + val mergeAggregate: GroupByAggregationOnColumn def dataType: DataType = ref.dataType + def updateDataType: DataType = dataType def nullable: Boolean = ref.nullable - def children: Seq[Expression] = ref :: Nil + def children: Seq[Expression] = Seq(ref) } class CudfCount(ref: Expression) extends CudfAggregate(ref) { - override val updateReductionAggregate: cudf.ColumnVector => cudf.Scalar = + override val updateReductionAggregateInternal: cudf.ColumnVector => cudf.Scalar = (col: cudf.ColumnVector) => cudf.Scalar.fromLong(col.getRowCount - col.getNullCount) - override val mergeReductionAggregate: cudf.ColumnVector => cudf.Scalar = + override val mergeReductionAggregateInternal: cudf.ColumnVector => cudf.Scalar = (col: cudf.ColumnVector) => col.sum - override lazy val updateAggregate: GroupByAggregation = + override lazy val updateAggregate: GroupByAggregationOnColumn = GroupByAggregation.count(NullPolicy.EXCLUDE) - override lazy val mergeAggregate: GroupByAggregation = GroupByAggregation.sum() + .onColumn(getOrdinal(ref)) + override lazy val mergeAggregate: GroupByAggregationOnColumn = + GroupByAggregation.sum() + .onColumn(getOrdinal(ref)) + + // the partial count outputs an int + override def updateDataType: DataType = IntegerType override def toString(): String = "CudfCount" } @@ -231,94 +272,129 @@ class CudfSum(ref: Expression) extends CudfAggregate(ref) { // sum(shorts): bigint // Aggregate [sum(shorts#33) AS sum(shorts)#50L] // - @transient val rapidsSumType: DType = GpuColumnVector.getNonNestedRapidsType(ref.dataType) + @transient val rapidsSumType: DType = GpuColumnVector.getNonNestedRapidsType(dataType) - override val updateReductionAggregate: cudf.ColumnVector => cudf.Scalar = + override val updateReductionAggregateInternal: cudf.ColumnVector => cudf.Scalar = (col: cudf.ColumnVector) => col.sum(rapidsSumType) - override val mergeReductionAggregate: cudf.ColumnVector => cudf.Scalar = updateReductionAggregate + override val mergeReductionAggregateInternal: cudf.ColumnVector => cudf.Scalar = + updateReductionAggregateInternal - override lazy val updateAggregate: GroupByAggregation = GroupByAggregation.sum() - override lazy val mergeAggregate: GroupByAggregation = GroupByAggregation.sum() + override lazy val updateAggregate: GroupByAggregationOnColumn = + GroupByAggregation.sum() + .onColumn(getOrdinal(ref)) + override lazy val mergeAggregate: GroupByAggregationOnColumn = + GroupByAggregation.sum() + .onColumn(getOrdinal(ref)) override def toString(): String = "CudfSum" + } class CudfMax(ref: Expression) extends CudfAggregate(ref) { - override val updateReductionAggregate: cudf.ColumnVector => cudf.Scalar = + override val updateReductionAggregateInternal: cudf.ColumnVector => cudf.Scalar = (col: cudf.ColumnVector) => col.max - override val mergeReductionAggregate: cudf.ColumnVector => cudf.Scalar = + override val mergeReductionAggregateInternal: cudf.ColumnVector => cudf.Scalar = (col: cudf.ColumnVector) => col.max - override lazy val updateAggregate: GroupByAggregation = GroupByAggregation.max() - override lazy val mergeAggregate: GroupByAggregation = GroupByAggregation.max() + override lazy val updateAggregate: GroupByAggregationOnColumn = + GroupByAggregation.max() + .onColumn(getOrdinal(ref)) + override lazy val mergeAggregate: GroupByAggregationOnColumn = + GroupByAggregation.max() + .onColumn(getOrdinal(ref)) override def toString(): String = "CudfMax" } class CudfMin(ref: Expression) extends CudfAggregate(ref) { - override val updateReductionAggregate: cudf.ColumnVector => cudf.Scalar = + override val updateReductionAggregateInternal: cudf.ColumnVector => cudf.Scalar = (col: cudf.ColumnVector) => col.min - override val mergeReductionAggregate: cudf.ColumnVector => cudf.Scalar = + override val mergeReductionAggregateInternal: cudf.ColumnVector => cudf.Scalar = (col: cudf.ColumnVector) => col.min - override lazy val updateAggregate: GroupByAggregation = GroupByAggregation.min() - override lazy val mergeAggregate: GroupByAggregation = GroupByAggregation.min() + override lazy val updateAggregate: GroupByAggregationOnColumn = + GroupByAggregation.min() + .onColumn(getOrdinal(ref)) + override lazy val mergeAggregate: GroupByAggregationOnColumn = + GroupByAggregation.min() + .onColumn(getOrdinal(ref)) override def toString(): String = "CudfMin" } class CudfCollectList(ref: Expression) extends CudfAggregate(ref) { - override lazy val updateReductionAggregate: cudf.ColumnVector => cudf.Scalar = + override lazy val updateReductionAggregateInternal: cudf.ColumnVector => cudf.Scalar = throw new UnsupportedOperationException("CollectList is not yet supported in reduction") - override lazy val mergeReductionAggregate: cudf.ColumnVector => cudf.Scalar = + override lazy val mergeReductionAggregateInternal: cudf.ColumnVector => cudf.Scalar = throw new UnsupportedOperationException("CollectList is not yet supported in reduction") - override lazy val updateAggregate: GroupByAggregation = GroupByAggregation.collectList() - override lazy val mergeAggregate: GroupByAggregation = GroupByAggregation.mergeLists() + override lazy val updateAggregate: GroupByAggregationOnColumn = + GroupByAggregation.collectList() + .onColumn(getOrdinal(ref)) + override lazy val mergeAggregate: GroupByAggregationOnColumn = + GroupByAggregation.mergeLists() + .onColumn(getOrdinal(ref)) override def toString(): String = "CudfCollectList" override def dataType: DataType = ArrayType(ref.dataType, containsNull = false) override def nullable: Boolean = false } class CudfMergeLists(ref: Expression) extends CudfAggregate(ref) { - override lazy val updateReductionAggregate: cudf.ColumnVector => cudf.Scalar = + override lazy val updateReductionAggregateInternal: cudf.ColumnVector => cudf.Scalar = throw new UnsupportedOperationException("MergeLists is not yet supported in reduction") - override lazy val mergeReductionAggregate: cudf.ColumnVector => cudf.Scalar = + override lazy val mergeReductionAggregateInternal: cudf.ColumnVector => cudf.Scalar = throw new UnsupportedOperationException("MergeLists is not yet supported in reduction") - override lazy val updateAggregate: GroupByAggregation = GroupByAggregation.mergeLists() - override lazy val mergeAggregate: GroupByAggregation = GroupByAggregation.mergeLists() + override lazy val updateAggregate: GroupByAggregationOnColumn = + GroupByAggregation.mergeLists() + .onColumn(getOrdinal(ref)) + override lazy val mergeAggregate: GroupByAggregationOnColumn = + GroupByAggregation.mergeLists() + .onColumn(getOrdinal(ref)) override def toString(): String = "CudfMergeLists" + override def nullable: Boolean = false } class CudfCollectSet(ref: Expression) extends CudfAggregate(ref) { - override lazy val updateReductionAggregate: cudf.ColumnVector => cudf.Scalar = + override lazy val updateReductionAggregateInternal: cudf.ColumnVector => cudf.Scalar = throw new UnsupportedOperationException("CollectSet is not yet supported in reduction") - override lazy val mergeReductionAggregate: cudf.ColumnVector => cudf.Scalar = + override lazy val mergeReductionAggregateInternal: cudf.ColumnVector => cudf.Scalar = throw new UnsupportedOperationException("CollectSet is not yet supported in reduction") - override lazy val updateAggregate: GroupByAggregation = GroupByAggregation.collectSet() - override lazy val mergeAggregate: GroupByAggregation = GroupByAggregation.mergeSets() + override lazy val updateAggregate: GroupByAggregationOnColumn = + GroupByAggregation.collectSet() + .onColumn(getOrdinal(ref)) + override lazy val mergeAggregate: GroupByAggregationOnColumn = + GroupByAggregation.mergeSets() + .onColumn(getOrdinal(ref)) override def toString(): String = "CudfCollectSet" override def dataType: DataType = ArrayType(ref.dataType, containsNull = false) override def nullable: Boolean = false } class CudfMergeSets(ref: Expression) extends CudfAggregate(ref) { - override lazy val updateReductionAggregate: cudf.ColumnVector => cudf.Scalar = + override lazy val updateReductionAggregateInternal: cudf.ColumnVector => cudf.Scalar = throw new UnsupportedOperationException("CudfMergeSets is not yet supported in reduction") - override lazy val mergeReductionAggregate: cudf.ColumnVector => cudf.Scalar = + override lazy val mergeReductionAggregateInternal: cudf.ColumnVector => cudf.Scalar = throw new UnsupportedOperationException("CudfMergeSets is not yet supported in reduction") - override lazy val updateAggregate: GroupByAggregation = GroupByAggregation.mergeSets() - override lazy val mergeAggregate: GroupByAggregation = GroupByAggregation.mergeSets() + override lazy val updateAggregate: GroupByAggregationOnColumn = + GroupByAggregation.mergeSets() + .onColumn(getOrdinal(ref)) + override lazy val mergeAggregate: GroupByAggregationOnColumn= + GroupByAggregation.mergeSets() + .onColumn(getOrdinal(ref)) override def toString(): String = "CudfMergeSets" + override def dataType: DataType = ref.dataType + override def nullable: Boolean = false } abstract class CudfFirstLastBase(ref: Expression) extends CudfAggregate(ref) { val includeNulls: NullPolicy val offset: Int - override val updateReductionAggregate: cudf.ColumnVector => cudf.Scalar = + override val updateReductionAggregateInternal: cudf.ColumnVector => cudf.Scalar = (col: cudf.ColumnVector) => col.reduce(ReductionAggregation.nth(offset, includeNulls)) - override val mergeReductionAggregate: cudf.ColumnVector => cudf.Scalar = + override val mergeReductionAggregateInternal: cudf.ColumnVector => cudf.Scalar = (col: cudf.ColumnVector) => col.reduce(ReductionAggregation.nth(offset, includeNulls)) - override lazy val updateAggregate: GroupByAggregation = + override lazy val updateAggregate: GroupByAggregationOnColumn = GroupByAggregation.nth(offset, includeNulls) - override lazy val mergeAggregate: GroupByAggregation = + .onColumn(getOrdinal(ref)) + override lazy val mergeAggregate: GroupByAggregationOnColumn = GroupByAggregation.nth(offset, includeNulls) + .onColumn(getOrdinal(ref)) } class CudfFirstIncludeNulls(ref: Expression) extends CudfFirstLastBase(ref) { @@ -527,7 +603,7 @@ case class GpuSum(child: Expression, resultType: DataType) * * The final result would be: * - * type | x | y + * type | x | * -----+---+-- * b | 1 | 3 * a | 2 | null @@ -596,6 +672,10 @@ case class GpuCount(children: Seq[Expression]) extends GpuAggregateFunction override lazy val inputProjection: Seq[Expression] = Seq(children.head) override lazy val updateExpressions: Seq[GpuExpression] = Seq(new CudfCount(cudfCount)) + + override lazy val postUpdate: Seq[Expression] = Seq(GpuCast(cudfCount, dataType)) + override lazy val postUpdateAttr: Seq[AttributeReference] = Seq(cudfCount) + override lazy val mergeExpressions: Seq[GpuExpression] = Seq(new CudfSum(cudfCount)) override lazy val evaluateExpression: Expression = cudfCount From 60cb3dbe4f636c544d0801469fa61bc902fa28e7 Mon Sep 17 00:00:00 2001 From: Alessandro Bellina Date: Wed, 8 Sep 2021 19:56:52 -0500 Subject: [PATCH 02/13] Add basic implementation of GpuM2 Co-authored-by: Nghia Truong --- .../spark/sql/rapids/AggregateFunctions.scala | 156 ++++++++++++++++++ 1 file changed, 156 insertions(+) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala index dccce0b30f4..32cc0a008ab 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala @@ -417,6 +417,68 @@ class CudfLastExcludeNulls(ref: Expression) extends CudfFirstLastBase(ref) { override val offset: Int = -1 } +/** This is only used by the M2 class aggregates, do not confuse this with GpuAverage */ +class CudfMean(ref: Expression) extends CudfAggregate(ref) { + @transient val rapidsAvgType: DType = GpuColumnVector.getNonNestedRapidsType(dataType) + + override lazy val updateReductionAggregateInternal: cudf.ColumnVector => cudf.Scalar = + throw new UnsupportedOperationException("Reduction averages not supported") + + override lazy val mergeReductionAggregateInternal: cudf.ColumnVector => cudf.Scalar = + throw new UnsupportedOperationException("Reduction averages not supported") + + override lazy val updateAggregate: GroupByAggregationOnColumn = + GroupByAggregation.mean() + .onColumn(getOrdinal(ref)) + + override lazy val mergeAggregate: GroupByAggregationOnColumn = + GroupByAggregation.mean() + .onColumn(getOrdinal(ref)) + + override def toString(): String = "CudfMeanForM2" +} + +class CudfMergeM2(ref: Expression) + extends CudfAggregate(ref) { + override lazy val updateReductionAggregateInternal: cudf.ColumnVector => cudf.Scalar = + throw new UnsupportedOperationException("M2 aggregation is not yet supported in reduction") + override lazy val mergeReductionAggregateInternal: cudf.ColumnVector => cudf.Scalar = + throw new UnsupportedOperationException("M2 aggregation is not yet supported in reduction") + override lazy val updateAggregate: GroupByAggregationOnColumn = + throw new UnsupportedOperationException("Only merge is supported for CudfMergeM2") + + override lazy val mergeAggregate: GroupByAggregationOnColumn = + GroupByAggregation.mergeM2() + .onColumn(getOrdinal(ref)) + + override def toString(): String = "CudfMergeM2" + + override def dataType: DataType = + StructType( + StructField("n", IntegerType, nullable = true) :: + StructField("avg", DoubleType, nullable = true) :: + StructField("m2", DoubleType, nullable = true) :: Nil) + + override def nullable: Boolean = true +} + +class CudfM2(ref: Expression) extends CudfAggregate(ref) { + override lazy val updateReductionAggregateInternal: cudf.ColumnVector => cudf.Scalar = + throw new UnsupportedOperationException("M2 aggregation is not yet supported in reduction") + override lazy val mergeReductionAggregateInternal: cudf.ColumnVector => cudf.Scalar = + throw new UnsupportedOperationException("M2 aggregation is not yet supported in reduction") + override lazy val updateAggregate: GroupByAggregationOnColumn = + GroupByAggregation.M2() + .onColumn(getOrdinal(ref)) + override lazy val mergeAggregate: GroupByAggregationOnColumn = + GroupByAggregation.mergeM2() + .onColumn(getOrdinal(ref)) + override def toString(): String = "CudfM2" + + override def dataType: DataType = DoubleType + override def nullable: Boolean = true +} + case class GpuMin(child: Expression) extends GpuAggregateFunction with GpuBatchedRunningWindowWithFixer with GpuAggregateWindowFunction @@ -1045,3 +1107,97 @@ case class GpuToCpuCollectBufferTransition( projection.apply(InternalRow.apply(arrayData)).getBytes } } + +abstract class GpuM2(child: Expression) + extends GpuAggregateFunction with ImplicitCastInputTypes with Serializable { + + override def dataType: DataType = DoubleType + override def nullable: Boolean = true + override def inputTypes: Seq[AbstractDataType] = Seq(NumericType) + + // Buffers for the update stage. + protected lazy val bufferN: AttributeReference = + AttributeReference("n", DoubleType, nullable = false)() + + protected lazy val bufferAvg: AttributeReference = + AttributeReference("avg", DoubleType, nullable = true)() + + protected lazy val bufferM2: AttributeReference = + AttributeReference("m2", DoubleType, nullable = true)() + + override lazy val aggBufferAttributes: Seq[AttributeReference] = + bufferN :: bufferAvg :: bufferM2 :: Nil + + override lazy val inputProjection: Seq[Expression] = Seq(child, child, child) + + override lazy val initialValues: Seq[GpuLiteral] = + Seq(GpuLiteral(0.0), GpuLiteral(0.0), GpuLiteral(0.0)) + + // For local update, we need to compute all 3 aggregates: count, sum, and M2. + override lazy val updateExpressions: Seq[Expression] = + new CudfCount(bufferN) :: + new CudfMean(bufferAvg) :: + new CudfM2(bufferM2) :: Nil + + override lazy val postUpdate: Seq[Expression] = { + // we copy the `bufferN` attribute and stomp on the type as Integer here, + // because we really do have an int, and the `DoubleType` is what we want to output + // to match Spark. So this expression says we are going from the aggregated count, + // and we are casting it double so that a final aggregate that is on the CPU can use it, + // TODO: we need to make bufferCount (n) a Long at least in cuDF + val bufferCountAsInt = bufferN.copy(dataType = IntegerType)( + bufferN.exprId, bufferN.qualifier) + GpuCast(bufferCountAsInt, DoubleType) :: + bufferAvg :: + bufferM2 :: Nil + } + + // before we merge we have 3 columns, we have to turn them into a struct + // this is a prior 3-column result of MERGE_M2 that was exploded out into 3 columns + // and reconstitutes a struct, which is necessary for MERGE_M2 + // + // We cast `n` to be an Integer, as that's what MERGE_M2 expects, note that Spark + // keeps `n` as a Double. + override lazy val preMerge: Seq[Expression] = { + val childrenWithNames = + GpuLiteral("n", StringType) :: GpuCast(bufferN, IntegerType) :: + GpuLiteral("avg", StringType) :: bufferAvg :: + GpuLiteral("m2", StringType) :: bufferM2 :: Nil + GpuCreateNamedStruct(childrenWithNames) :: Nil + } + + def mergeM2DataType: DataType = + StructType( + StructField("n", IntegerType, nullable = true) :: + StructField("avg", DoubleType, nullable = true) :: + StructField("m2", DoubleType, nullable = true) :: Nil) + + override def mergeBufferAttributes: Seq[AttributeReference] = postMergeAttr + + private val m2Struct = + AttributeReference("m2struct", mergeM2DataType, nullable = true)() + + override lazy val mergeExpressions: Seq[GpuExpression] = new CudfMergeM2(m2Struct) :: Nil + + // after a MERGE_M2 call in cudf, our result is 1 struct column + // we create this attribute to represent this result column + override lazy val postMergeAttr: Seq[AttributeReference] = Seq(m2Struct) + + // we will bind this expression against the attribute above in post-merge, + // and then project. The result will be 3 columns, where the first one is + // casted to Double to match Spark. + override lazy val postMerge: Seq[Expression] = Seq( + GpuCast(GpuGetStructField(m2Struct, 0), DoubleType), + GpuCast(GpuGetStructField(m2Struct, 1), DoubleType), + GpuCast(GpuGetStructField(m2Struct, 2), DoubleType)) +} + +case class GpuStddevPop(child: Expression) extends GpuM2(child) { + override lazy val evaluateExpression: GpuExpression = { + // Compute stddev_pop from M2s: stddev_pop = sqrt(M2 / n). + GpuSqrt(GpuDivide(bufferM2, bufferN, failOnErrorOverride = false)) + } + + override def children: Seq[Expression] = Seq(child) + override def prettyName: String = "stddev_pop" +} From 379ff8ac62b42379776139c0de614d7afd852947 Mon Sep 17 00:00:00 2001 From: Alessandro Bellina Date: Wed, 8 Sep 2021 20:25:25 -0500 Subject: [PATCH 03/13] Use Expression consistently in GpuAggregateFunction --- .../spark/sql/rapids/AggregateFunctions.scala | 60 +++++++++---------- 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala index 32cc0a008ab..705d565bab3 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala @@ -70,7 +70,7 @@ trait GpuAggregateFunction extends GpuExpression // them to initialize the aggregation buffer, and returns them in case // of an empty aggregate when there are no expressions, // here we copy them but with the gpu equivalent - val initialValues: Seq[GpuExpression] + val initialValues: Seq[Expression] // update: first half of the aggregation (count = count) val updateExpressions: Seq[Expression] @@ -84,7 +84,7 @@ trait GpuAggregateFunction extends GpuExpression lazy val postUpdateAttr: Seq[AttributeReference] = aggBufferAttributes // merge: second half of the aggregation (count = sum). Also use to merge multiple batches. - val mergeExpressions: Seq[GpuExpression] + val mergeExpressions: Seq[Expression] // expression to use to modify pre and post a cudf merge aggregate // preMerge: modify a partial batch to match the input required by a merge aggregate @@ -106,7 +106,7 @@ trait GpuAggregateFunction extends GpuExpression case class WrappedAggFunction(aggregateFunction: GpuAggregateFunction, filter: Expression) extends GpuAggregateFunction { - override val inputProjection: Seq[GpuExpression] = { + override val inputProjection: Seq[Expression] = { val caseWhenExpressions = aggregateFunction.inputProjection.map { ip => // special case average with null result from the filter as expected values should be // (0.0,0) for (sum, count) @@ -132,11 +132,11 @@ case class WrappedAggFunction(aggregateFunction: GpuAggregateFunction, filter: E override def children: Seq[Expression] = Seq(aggregateFunction, filter) - override val initialValues: Seq[GpuExpression] = + override val initialValues: Seq[Expression] = aggregateFunction.initialValues override val updateExpressions: Seq[Expression] = aggregateFunction.updateExpressions - override val mergeExpressions: Seq[GpuExpression] = + override val mergeExpressions: Seq[Expression] = aggregateFunction.mergeExpressions override val evaluateExpression: Expression = aggregateFunction.evaluateExpression @@ -486,8 +486,8 @@ case class GpuMin(child: Expression) extends GpuAggregateFunction private lazy val cudfMin = AttributeReference("min", child.dataType)() override lazy val inputProjection: Seq[Expression] = Seq(child) - override lazy val updateExpressions: Seq[GpuExpression] = Seq(new CudfMin(cudfMin)) - override lazy val mergeExpressions: Seq[GpuExpression] = Seq(new CudfMin(cudfMin)) + override lazy val updateExpressions: Seq[Expression] = Seq(new CudfMin(cudfMin)) + override lazy val mergeExpressions: Seq[Expression] = Seq(new CudfMin(cudfMin)) override lazy val evaluateExpression: Expression = cudfMin override lazy val aggBufferAttributes: Seq[AttributeReference] = cudfMin :: Nil @@ -541,8 +541,8 @@ case class GpuMax(child: Expression) extends GpuAggregateFunction private lazy val cudfMax = AttributeReference("max", child.dataType)() override lazy val inputProjection: Seq[Expression] = Seq(child) - override lazy val updateExpressions: Seq[GpuExpression] = Seq(new CudfMax(cudfMax)) - override lazy val mergeExpressions: Seq[GpuExpression] = Seq(new CudfMax(cudfMax)) + override lazy val updateExpressions: Seq[Expression] = Seq(new CudfMax(cudfMax)) + override lazy val mergeExpressions: Seq[Expression] = Seq(new CudfMax(cudfMax)) override lazy val evaluateExpression: Expression = cudfMax override lazy val aggBufferAttributes: Seq[AttributeReference] = cudfMax :: Nil @@ -597,8 +597,8 @@ case class GpuSum(child: Expression, resultType: DataType) private lazy val cudfSum = AttributeReference("sum", resultType)() override lazy val inputProjection: Seq[Expression] = Seq(child) - override lazy val updateExpressions: Seq[GpuExpression] = Seq(new CudfSum(cudfSum)) - override lazy val mergeExpressions: Seq[GpuExpression] = Seq(new CudfSum(cudfSum)) + override lazy val updateExpressions: Seq[Expression] = Seq(new CudfSum(cudfSum)) + override lazy val mergeExpressions: Seq[Expression] = Seq(new CudfSum(cudfSum)) override lazy val evaluateExpression: Expression = cudfSum override lazy val aggBufferAttributes: Seq[AttributeReference] = cudfSum :: Nil @@ -706,11 +706,11 @@ case class GpuPivotFirst( expr } - override lazy val updateExpressions: Seq[GpuExpression] = { + override lazy val updateExpressions: Seq[Expression] = { pivotColAttr.map(pivotColumnValue => new CudfLastExcludeNulls(pivotColumnValue)) } - override lazy val mergeExpressions: Seq[GpuExpression] = { + override lazy val mergeExpressions: Seq[Expression] = { pivotColAttr.map(pivotColumnValue => new CudfLastExcludeNulls(pivotColumnValue)) } @@ -733,12 +733,12 @@ case class GpuCount(children: Seq[Expression]) extends GpuAggregateFunction private lazy val cudfCount = AttributeReference("count", LongType)() override lazy val inputProjection: Seq[Expression] = Seq(children.head) - override lazy val updateExpressions: Seq[GpuExpression] = Seq(new CudfCount(cudfCount)) + override lazy val updateExpressions: Seq[Expression] = Seq(new CudfCount(cudfCount)) override lazy val postUpdate: Seq[Expression] = Seq(GpuCast(cudfCount, dataType)) override lazy val postUpdateAttr: Seq[AttributeReference] = Seq(cudfCount) - override lazy val mergeExpressions: Seq[GpuExpression] = Seq(new CudfSum(cudfCount)) + override lazy val mergeExpressions: Seq[Expression] = Seq(new CudfSum(cudfCount)) override lazy val evaluateExpression: Expression = cudfCount override lazy val aggBufferAttributes: Seq[AttributeReference] = cudfCount :: Nil @@ -806,7 +806,7 @@ case class GpuAverage(child: Expression) extends GpuAggregateFunction GpuLiteral(litVal, DoubleType) } - override lazy val inputProjection: Seq[GpuExpression] = Seq( + override lazy val inputProjection: Seq[Expression] = Seq( child match { case literal: GpuLiteral => toDoubleLit(literal.value) case _ => GpuCoalesce(Seq(GpuCast(child, DoubleType), GpuLiteral(0D, DoubleType))) @@ -818,20 +818,20 @@ case class GpuAverage(child: Expression) extends GpuAggregateFunction // a sum of this == the count GpuCast(GpuIsNotNull(child), LongType) }) - override lazy val mergeExpressions: Seq[GpuExpression] = Seq(new CudfSum(cudfSum), + override lazy val mergeExpressions: Seq[Expression] = Seq(new CudfSum(cudfSum), new CudfSum(cudfCount)) // The count input projection will need to be collected as a sum (of counts) instead of // counts (of counts) as the GpuIsNotNull o/p is casted to count=0 for null and 1 otherwise, and // the total count can be correctly evaluated only by summing them. eg. avg(col(null, 27)) // should be 27, with count column projection as (0, 1) and total count for dividing the // average = (0 + 1) and not 2 which is the rowcount of the projected column. - override lazy val updateExpressions: Seq[GpuExpression] = Seq(new CudfSum(cudfSum), + override lazy val updateExpressions: Seq[Expression] = Seq(new CudfSum(cudfSum), new CudfSum(cudfCount)) // NOTE: this sets `failOnErrorOverride=false` in `GpuDivide` to force it not to throw // divide-by-zero exceptions, even when ansi mode is enabled in Spark. // This is to conform with Spark's behavior in the Average aggregate function. - override lazy val evaluateExpression: GpuExpression = GpuDivide( + override lazy val evaluateExpression: Expression = GpuDivide( GpuCast(cudfSum, DoubleType), GpuCast(cudfCount, DoubleType), failOnErrorOverride = false) @@ -881,8 +881,8 @@ case class GpuFirst(child: Expression, ignoreNulls: Boolean) Seq(new CudfFirstIncludeNulls(cudfFirst), new CudfFirstIncludeNulls(valueSet)) } - override lazy val updateExpressions: Seq[GpuExpression] = commonExpressions - override lazy val mergeExpressions: Seq[GpuExpression] = commonExpressions + override lazy val updateExpressions: Seq[Expression] = commonExpressions + override lazy val mergeExpressions: Seq[Expression] = commonExpressions override lazy val evaluateExpression: Expression = cudfFirst override lazy val aggBufferAttributes: Seq[AttributeReference] = cudfFirst :: valueSet :: Nil @@ -926,8 +926,8 @@ case class GpuLast(child: Expression, ignoreNulls: Boolean) Seq(new CudfLastIncludeNulls(cudfLast), new CudfLastIncludeNulls(valueSet)) } - override lazy val updateExpressions: Seq[GpuExpression] = commonExpressions - override lazy val mergeExpressions: Seq[GpuExpression] = commonExpressions + override lazy val updateExpressions: Seq[Expression] = commonExpressions + override lazy val mergeExpressions: Seq[Expression] = commonExpressions override lazy val evaluateExpression: Expression = cudfLast override lazy val aggBufferAttributes: Seq[AttributeReference] = cudfLast :: valueSet :: Nil @@ -973,7 +973,7 @@ trait GpuCollectBase extends GpuAggregateFunction with GpuAggregateWindowFunctio override val windowInputProjection: Seq[Expression] = Seq(child) // Make them lazy to avoid being initialized when creating a GpuCollectOp. - override lazy val initialValues: Seq[GpuExpression] = throw new UnsupportedOperationException + override lazy val initialValues: Seq[Expression] = throw new UnsupportedOperationException override val inputProjection: Seq[Expression] = Seq(child) @@ -1002,9 +1002,9 @@ case class GpuCollectList( inputAggBufferOffset: Int = 0) extends GpuCollectBase { - override lazy val updateExpressions: Seq[GpuExpression] = new CudfCollectList(inputBuf) :: Nil + override lazy val updateExpressions: Seq[Expression] = new CudfCollectList(inputBuf) :: Nil - override lazy val mergeExpressions: Seq[GpuExpression] = new CudfMergeLists(outputBuf) :: Nil + override lazy val mergeExpressions: Seq[Expression] = new CudfMergeLists(outputBuf) :: Nil override lazy val evaluateExpression: Expression = outputBuf @@ -1029,9 +1029,9 @@ case class GpuCollectSet( inputAggBufferOffset: Int = 0) extends GpuCollectBase { - override lazy val updateExpressions: Seq[GpuExpression] = new CudfCollectSet(inputBuf) :: Nil + override lazy val updateExpressions: Seq[Expression] = new CudfCollectSet(inputBuf) :: Nil - override lazy val mergeExpressions: Seq[GpuExpression] = new CudfMergeSets(outputBuf) :: Nil + override lazy val mergeExpressions: Seq[Expression] = new CudfMergeSets(outputBuf) :: Nil override lazy val evaluateExpression: Expression = outputBuf @@ -1177,7 +1177,7 @@ abstract class GpuM2(child: Expression) private val m2Struct = AttributeReference("m2struct", mergeM2DataType, nullable = true)() - override lazy val mergeExpressions: Seq[GpuExpression] = new CudfMergeM2(m2Struct) :: Nil + override lazy val mergeExpressions: Seq[Expression] = new CudfMergeM2(m2Struct) :: Nil // after a MERGE_M2 call in cudf, our result is 1 struct column // we create this attribute to represent this result column @@ -1193,7 +1193,7 @@ abstract class GpuM2(child: Expression) } case class GpuStddevPop(child: Expression) extends GpuM2(child) { - override lazy val evaluateExpression: GpuExpression = { + override lazy val evaluateExpression: Expression = { // Compute stddev_pop from M2s: stddev_pop = sqrt(M2 / n). GpuSqrt(GpuDivide(bufferM2, bufferN, failOnErrorOverride = false)) } From 8c8200406deca46ad06fc54e3f4c8d13c8aa5ffa Mon Sep 17 00:00:00 2001 From: Alessandro Bellina Date: Wed, 8 Sep 2021 22:34:56 -0500 Subject: [PATCH 04/13] Clean up a few comments --- .../src/main/scala/com/nvidia/spark/rapids/aggregate.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala index 804b81a2c7f..60ed1c6e3e9 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala @@ -124,6 +124,7 @@ object AggregateUtils { * Compute the aggregation modes and aggregate expressions for all aggregation expressions * @param aggExpressions the aggregate expressions * @param aggBufferAttributes attributes to be bound to the aggregate expressions + * @param mergeBufferAttributes merge attributes to be bound to the merge expressions */ def computeAggModeCudfAggregates( aggExpressions: Seq[GpuAggregateExpression], @@ -810,7 +811,6 @@ class GpuHashAggregateIterator( groupingExpressions.map(_.dataType) for ((aggExp, mode, aggregates) <- aggModeCudfAggregates) { - // bind pre-merge to the aggBufferAttributes (input) val aggFn = aggExp.aggregateFunction if ((mode == Partial || mode == Complete) && ! merge) { preStep ++= aggFn.preUpdate From d2ff0e17fcee45ec4120b5484854f14b60229984 Mon Sep 17 00:00:00 2001 From: Alessandro Bellina Date: Thu, 9 Sep 2021 10:45:51 -0500 Subject: [PATCH 05/13] Cleanup bound cudf aggregate expressions --- .../com/nvidia/spark/rapids/aggregate.scala | 37 ++++++++++--------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala index 60ed1c6e3e9..0f9912e6566 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala @@ -121,16 +121,15 @@ object AggregateUtils { } /** - * Compute the aggregation modes and aggregate expressions for all aggregation expressions + * Bind cuDF aggregate expressions depending on the aggregate expression mode * @param aggExpressions the aggregate expressions * @param aggBufferAttributes attributes to be bound to the aggregate expressions * @param mergeBufferAttributes merge attributes to be bound to the merge expressions */ - def computeAggModeCudfAggregates( + def computeBoundCudfAggregates( aggExpressions: Seq[GpuAggregateExpression], aggBufferAttributes: Seq[Attribute], - mergeBufferAttributes: Seq[Attribute]): Seq[( - GpuAggregateExpression, AggregateMode, Seq[CudfAggregate])] = { + mergeBufferAttributes: Seq[Attribute]): Seq[BoundCudfAggregate] = { // // update expressions are those performed on the raw input data // e.g. for count it's count, and for average it's sum and count. @@ -151,11 +150,15 @@ object AggregateUtils { GpuBindReferences.bindGpuReferences(mergeExpressionsSeq(modeIndex), mergeBufferAttributes) .asInstanceOf[Seq[CudfAggregate]] } - (expr, expr.mode, cudfAggregates) + BoundCudfAggregate(expr, cudfAggregates) } } } +case class BoundCudfAggregate( + aggExpression: GpuAggregateExpression, + boundCudfAggregate: Seq[CudfAggregate]) + /** Utility class to hold all of the metrics related to hash aggregation */ case class GpuHashAggregateMetrics( numOutputRows: GpuMetric, @@ -244,7 +247,7 @@ class GpuHashAggregateIterator( boundInputReferences: Seq[GpuExpression], boundFinalProjections: Option[Seq[GpuExpression]], boundResultReferences: Seq[Expression], - aggModeCudfAggregates: Seq[(GpuAggregateExpression, AggregateMode, Seq[CudfAggregate])]) + boundCudfAggregates: Seq[BoundCudfAggregate]) Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => close())) @@ -308,7 +311,7 @@ class GpuHashAggregateIterator( } private def computeTargetMergeBatchSize(confTargetSize: Long): Long = { - val aggregates = boundExpressions.aggModeCudfAggregates.flatMap(_._3) + val aggregates = boundExpressions.boundCudfAggregates.flatMap(_.boundCudfAggregate) val mergedTypes = groupingExpressions.map(_.dataType) ++ aggregates.map(_.dataType) AggregateUtils.computeTargetBatchSize(confTargetSize, mergedTypes, mergedTypes,isReductionOnly) } @@ -467,7 +470,7 @@ class GpuHashAggregateIterator( val aggBufferAttributes = groupingAttributes ++ aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes) val sorter = new GpuSorter(ordering, aggBufferAttributes) - val aggregates = boundExpressions.aggModeCudfAggregates.flatMap(_._3) + val aggregates = boundExpressions.boundCudfAggregates.flatMap(_.boundCudfAggregate) val aggBatchTypes = groupingExpressions.map(_.dataType) ++ aggregates.map(_.dataType) // Use the out of core sort iterator to sort the batches by grouping key @@ -633,8 +636,8 @@ class GpuHashAggregateIterator( val mergeBufferAttributes = groupingAttributes ++ aggregateExpressions.flatMap(_.aggregateFunction.mergeBufferAttributes) - val aggModeCudfAggregates = - AggregateUtils.computeAggModeCudfAggregates( + val boundCudfAggregates = + AggregateUtils.computeBoundCudfAggregates( aggregateExpressions, aggBufferAttributes, mergeBufferAttributes) // boundInputReferences is used to pick out of the input batch the appropriate columns @@ -762,7 +765,7 @@ class GpuHashAggregateIterator( groupingAttributes) } BoundExpressionsModeAggregates(boundInputReferences, boundFinalProjections, - boundResultReferences, aggModeCudfAggregates) + boundResultReferences, boundCudfAggregates) } /** @@ -781,7 +784,7 @@ class GpuHashAggregateIterator( val aggBufferAttributes = groupingAttributes ++ aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes) - val aggModeCudfAggregates = boundExpressions.aggModeCudfAggregates + val boundCudfAggregates = boundExpressions.boundCudfAggregates val computeAggTime = metrics.computeAggTime withResource(new NvtxWithMetrics("computeAggregate", NvtxColor.CYAN, computeAggTime)) { _ => if (groupingExpressions.nonEmpty) { @@ -810,9 +813,9 @@ class GpuHashAggregateIterator( dataTypes ++= groupingExpressions.map(_.dataType) - for ((aggExp, mode, aggregates) <- aggModeCudfAggregates) { + for (BoundCudfAggregate(aggExp, aggregates) <- boundCudfAggregates) { val aggFn = aggExp.aggregateFunction - if ((mode == Partial || mode == Complete) && ! merge) { + if ((aggExp.mode == Partial || aggExp.mode == Complete) && ! merge) { preStep ++= aggFn.preUpdate postStep ++= aggFn.postUpdate postStepAttr ++= aggFn.postUpdateAttr @@ -823,7 +826,7 @@ class GpuHashAggregateIterator( } aggregates.map { a => - if ((mode == Partial || mode == Complete) && !merge) { + if ((aggExp.mode == Partial || aggExp.mode == Complete) && !merge) { cudfAggregates += a.updateAggregate dataTypes += a.updateDataType } else { @@ -861,9 +864,9 @@ class GpuHashAggregateIterator( // we ask the appropriate merge or update CudfAggregates, what their // reduction merge or update aggregates functions are val cvs = mutable.ArrayBuffer[GpuColumnVector]() - aggModeCudfAggregates.foreach { case (_, mode, aggs) => + boundCudfAggregates.foreach { case BoundCudfAggregate(aggExp, aggs) => aggs.foreach { agg => - val aggFn = if ((mode == Partial || mode == Complete) && !merge) { + val aggFn = if ((aggExp.mode == Partial || aggExp.mode == Complete) && !merge) { agg.updateReductionAggregate } else { agg.mergeReductionAggregate From 2f5d812d78657a961dd4bcce6572476b3b74d8ae Mon Sep 17 00:00:00 2001 From: Alessandro Bellina Date: Fri, 10 Sep 2021 00:32:12 -0500 Subject: [PATCH 06/13] groupingExpressions as NamedExpression --- .../scala/com/nvidia/spark/rapids/aggregate.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala index 0f9912e6566..541d45ffaeb 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala @@ -216,7 +216,7 @@ object AggregateModeInfo { */ class GpuHashAggregateIterator( cbIter: Iterator[ColumnarBatch], - groupingExpressions: Seq[Expression], + groupingExpressions: Seq[NamedExpression], aggregateExpressions: Seq[GpuAggregateExpression], aggregateAttributes: Seq[Attribute], resultExpressions: Seq[NamedExpression], @@ -466,7 +466,7 @@ class GpuHashAggregateIterator( val shims = ShimLoader.getSparkShims val ordering = groupingExpressions.map(shims.sortOrder(_, Ascending, NullsFirst)) - val groupingAttributes = groupingExpressions.map(_.asInstanceOf[NamedExpression].toAttribute) + val groupingAttributes = groupingExpressions.map(_.toAttribute) val aggBufferAttributes = groupingAttributes ++ aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes) val sorter = new GpuSorter(ordering, aggBufferAttributes) @@ -630,7 +630,7 @@ class GpuHashAggregateIterator( * expression in allExpressions */ private def setupReferences(childAttr: AttributeSeq): BoundExpressionsModeAggregates = { - val groupingAttributes = groupingExpressions.map(_.asInstanceOf[NamedExpression].toAttribute) + val groupingAttributes = groupingExpressions.map(_.toAttribute) val aggBufferAttributes = groupingAttributes ++ aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes) val mergeBufferAttributes = groupingAttributes ++ @@ -779,7 +779,7 @@ class GpuHashAggregateIterator( toAggregateBatch: ColumnarBatch, merge: Boolean, isSorted: Boolean = false): ColumnarBatch = { - val groupingAttributes = groupingExpressions.map(_.asInstanceOf[NamedExpression].toAttribute) + val groupingAttributes = groupingExpressions.map(_.toAttribute) val aggBufferAttributes = groupingAttributes ++ aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes) @@ -995,7 +995,7 @@ abstract class GpuBaseAggregateMeta[INPUT <: SparkPlan]( override def convertToGpu(): GpuExec = { GpuHashAggregateExec( requiredChildDistributionExpressions.map(_.map(_.convertToGpu())), - groupingExpressions.map(_.convertToGpu()), + groupingExpressions.map(_.convertToGpu()).asInstanceOf[Seq[NamedExpression]], aggregateExpressions.map(_.convertToGpu()).asInstanceOf[Seq[GpuAggregateExpression]], aggregateAttributes.map(_.convertToGpu()).asInstanceOf[Seq[Attribute]], resultExpressions.map(_.convertToGpu()).asInstanceOf[Seq[NamedExpression]], @@ -1076,7 +1076,7 @@ abstract class GpuTypedImperativeSupportedAggregateExecMeta[INPUT <: BaseAggrega } GpuHashAggregateExec( requiredChildDistributionExpressions.map(_.map(_.convertToGpu())), - groupingExpressions.map(_.convertToGpu()), + groupingExpressions.map(_.convertToGpu()).asInstanceOf[Seq[NamedExpression]], aggregateExpressions.map(_.convertToGpu()).asInstanceOf[Seq[GpuAggregateExpression]], aggAttributes.map(_.convertToGpu()).asInstanceOf[Seq[Attribute]], retExpressions.map(_.convertToGpu()).asInstanceOf[Seq[NamedExpression]], @@ -1401,7 +1401,7 @@ class GpuObjectHashAggregateExecMeta( */ case class GpuHashAggregateExec( requiredChildDistributionExpressions: Option[Seq[Expression]], - groupingExpressions: Seq[Expression], + groupingExpressions: Seq[NamedExpression], aggregateExpressions: Seq[GpuAggregateExpression], aggregateAttributes: Seq[Attribute], resultExpressions: Seq[NamedExpression], From 1b7c5bed046bef3c039be8bce646905e918753be Mon Sep 17 00:00:00 2001 From: Alessandro Bellina Date: Fri, 10 Sep 2021 09:12:02 -0500 Subject: [PATCH 07/13] Add comments around BoundCudfAggregate --- .../com/nvidia/spark/rapids/aggregate.scala | 26 ++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala index 541d45ffaeb..dc9cf2277de 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala @@ -155,6 +155,30 @@ object AggregateUtils { } } +/** + * Structure containing the original expressions, and a seq of `CudfAggregate` + * that corresponds to such a `GpuAggregateExpression.` For example, a + * `GpuAverage` aggregate, means we have two `CudfAggregate` instances, one + * for the count and one for the sum (hence the sequence). + * + * `boundCudfAggregate` items have a reference that is bound to either the + * update or merge buffer attributes, so it can mean different things depending + * on the stage of the aggregate. + * + * For example, the `GpuM2` aggregate can have either a `CudfM2` or a `CudfMergeM2` + * `CudfAggregate`. The reference used for `CudfM2` is that of 3 columns + * (n, mean, m2), but the reference used for `CudfMergeM2` is that of a struct + * (m2struct). In other words, this is the shape cuDF expects. + * + * In the update case, `boundCudfAggregate` follows Spark, for the aggregates we have + * currently implemented. The update case must match the shape outputted by the preUpdate + * step. + * + * In the merge case, `boundCudfAggregate` shape needs to be the result of the preMerge + * step. In the case of `CudfMergeM2`, preMerge takes 3 columns and turns them + * into the desired struct. + * + */ case class BoundCudfAggregate( aggExpression: GpuAggregateExpression, boundCudfAggregate: Seq[CudfAggregate]) @@ -795,7 +819,7 @@ class GpuHashAggregateIterator( // For example: GpuAverage has an update version of: (CudfSum, CudfCount) // and CudfCount has an update version of AggregateOp.COUNT and a // merge version of AggregateOp.COUNT. - var dataTypes = new mutable.ArrayBuffer[DataType]() + val dataTypes = new mutable.ArrayBuffer[DataType]() val cudfAggregates = new mutable.ArrayBuffer[GroupByAggregationOnColumn]() // `GpuAggregateFunction` can add a pre and post step for update From 6fc81f867acd031d48103c12a513dbee90d94822 Mon Sep 17 00:00:00 2001 From: Alessandro Bellina Date: Fri, 10 Sep 2021 09:54:04 -0500 Subject: [PATCH 08/13] Use the Table.concatenate approach, instead of per column --- .../com/nvidia/spark/rapids/aggregate.scala | 27 +++++++++---------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala index dc9cf2277de..ba967ba54e3 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala @@ -456,10 +456,7 @@ class GpuHashAggregateIterator( private def concatenateAndMerge( batches: mutable.ArrayBuffer[LazySpillableColumnarBatch]): LazySpillableColumnarBatch = { withResource(batches) { _ => - withResource(concatenateBatches(batches)) { concatVectors => - val concatBatch = new ColumnarBatch( - concatVectors.toArray, - concatVectors.head.getRowCount.toInt) + withResource(concatenateBatches(batches)) { concatBatch => withResource(computeAggregate(concatBatch, merge = true)) { mergedBatch => LazySpillableColumnarBatch(mergedBatch, metrics.spillCallback, "agg merged batch") } @@ -619,22 +616,22 @@ class GpuHashAggregateIterator( } /** - * Concatenates batches by concatenating the corresponding column vectors within the batches. + * Concatenates batches after extracting them from `LazySpillableColumnarBatch` * @note the input batches are not closed as part of this operation - * @param batchesToConcat batches to concatenate - * @return concatenated vectors that together represent the concatenated batch result + * @param spillableBatchesToConcat lazy spillable batches to concatenate + * @return concatenated batch result */ private def concatenateBatches( - batchesToConcat: mutable.ArrayBuffer[LazySpillableColumnarBatch]): Seq[GpuColumnVector] = { + spillableBatchesToConcat: mutable.ArrayBuffer[LazySpillableColumnarBatch]): ColumnarBatch = { val concatTime = metrics.concatTime withResource(new NvtxWithMetrics("concatenateBatches", NvtxColor.BLUE, concatTime)) { _ => - val numCols = batchesToConcat.head.numCols - (0 until numCols).safeMap { i => - val columnType = batchesToConcat.head.getBatch.column(i).dataType() - val columnsToConcat = batchesToConcat.map { - _.getBatch.column(i).asInstanceOf[GpuColumnVector].getBase - } - GpuColumnVector.from(cudf.ColumnVector.concatenate(columnsToConcat: _*), columnType) + val batchesToConcat = spillableBatchesToConcat.map(_.getBatch) + val numCols = batchesToConcat.head.numCols() + val dataTypes = (0 until numCols).map { + c => batchesToConcat.head.column(c).dataType + }.toArray + withResource(batchesToConcat.map(GpuColumnVector.from)) { tbl => + GpuColumnVector.from(cudf.Table.concatenate(tbl:_*), dataTypes) } } } From bfd307aafe7791d8af747f611e3828affd1a3d6e Mon Sep 17 00:00:00 2001 From: Alessandro Bellina Date: Fri, 10 Sep 2021 10:27:18 -0500 Subject: [PATCH 09/13] Fix typo in doc --- .../src/main/scala/com/nvidia/spark/rapids/aggregate.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala index ba967ba54e3..555252facab 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala @@ -228,7 +228,7 @@ object AggregateModeInfo { * `buildSortFallbackIterator` is used to sort the aggregated batches by the grouping keys and * performs a final merge aggregation pass on the sorted batches. * - * @param cbIter iterator providing the nput columnar batches + * @param cbIter iterator providing the input columnar batches * @param groupingExpressions expressions used for producing the grouping keys * @param aggregateExpressions GPU aggregate expressions used to produce the aggregations * @param aggregateAttributes attribute references to each aggregate expression From e3c5235e897799116ddc9b15932afd167eb714a8 Mon Sep 17 00:00:00 2001 From: Alessandro Bellina Date: Fri, 10 Sep 2021 10:44:05 -0500 Subject: [PATCH 10/13] Type erasure fix --- .../com/nvidia/spark/rapids/aggregate.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala index 64a0594e12c..5843d341869 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala @@ -1035,10 +1035,10 @@ abstract class GpuBaseAggregateMeta[INPUT <: SparkPlan]( override def convertToGpu(): GpuExec = { GpuHashAggregateExec( requiredChildDistributionExpressions.map(_.map(_.convertToGpu())), - groupingExpressions.map(_.convertToGpu()).asInstanceOf[Seq[NamedExpression]], - aggregateExpressions.map(_.convertToGpu()).asInstanceOf[Seq[GpuAggregateExpression]], - aggregateAttributes.map(_.convertToGpu()).asInstanceOf[Seq[Attribute]], - resultExpressions.map(_.convertToGpu()).asInstanceOf[Seq[NamedExpression]], + groupingExpressions.map(_.convertToGpu().asInstanceOf[NamedExpression]), + aggregateExpressions.map(_.convertToGpu().asInstanceOf[GpuAggregateExpression]), + aggregateAttributes.map(_.convertToGpu().asInstanceOf[Attribute]), + resultExpressions.map(_.convertToGpu().asInstanceOf[NamedExpression]), childPlans.head.convertIfNeeded(), conf.gpuTargetBatchSizeBytes) } @@ -1116,10 +1116,10 @@ abstract class GpuTypedImperativeSupportedAggregateExecMeta[INPUT <: BaseAggrega } GpuHashAggregateExec( requiredChildDistributionExpressions.map(_.map(_.convertToGpu())), - groupingExpressions.map(_.convertToGpu()).asInstanceOf[Seq[NamedExpression]], - aggregateExpressions.map(_.convertToGpu()).asInstanceOf[Seq[GpuAggregateExpression]], - aggAttributes.map(_.convertToGpu()).asInstanceOf[Seq[Attribute]], - retExpressions.map(_.convertToGpu()).asInstanceOf[Seq[NamedExpression]], + groupingExpressions.map(_.convertToGpu().asInstanceOf[NamedExpression]), + aggregateExpressions.map(_.convertToGpu().asInstanceOf[GpuAggregateExpression]), + aggAttributes.map(_.convertToGpu().asInstanceOf[Attribute]), + retExpressions.map(_.convertToGpu().asInstanceOf[NamedExpression]), childPlans.head.convertIfNeeded(), conf.gpuTargetBatchSizeBytes) } else { From 3c05fe11256c46f1b2c41099cf31e3ec8137565f Mon Sep 17 00:00:00 2001 From: Alessandro Bellina Date: Fri, 10 Sep 2021 12:38:53 -0500 Subject: [PATCH 11/13] Add comment on computeBoundCudfAggregates --- .../main/scala/com/nvidia/spark/rapids/aggregate.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala index 5843d341869..30625b8f825 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala @@ -121,7 +121,13 @@ object AggregateUtils { } /** - * Bind cuDF aggregate expressions depending on the aggregate expression mode + * Bind cuDF aggregate expressions depending on the aggregate expression mode. + * This binds `CudfAggregate` instances that are needed to realize each `GpuAggregateExpression`, + * where the shape the `CudfAggregate` expect is different for update vs merge. + * + * The only difference right now is in `CudfMergeM2`, in all other cases aggBufferAttributes + * and mergeBufferAttributes are the same. `CudfMergeM2` wants a struct to be passed to cuDF + * `MERGE_M2`, hence we handle it differently. * @param aggExpressions the aggregate expressions * @param aggBufferAttributes attributes to be bound to the aggregate expressions * @param mergeBufferAttributes merge attributes to be bound to the merge expressions From a626e65ecb6e4e613ee1e41cce283fd613465887 Mon Sep 17 00:00:00 2001 From: Alessandro Bellina Date: Tue, 14 Sep 2021 11:28:27 -0500 Subject: [PATCH 12/13] Fix leak when concatenating batches --- .../src/main/scala/com/nvidia/spark/rapids/aggregate.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala index 30625b8f825..90e0b2c1704 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala @@ -637,7 +637,9 @@ class GpuHashAggregateIterator( c => batchesToConcat.head.column(c).dataType }.toArray withResource(batchesToConcat.map(GpuColumnVector.from)) { tbl => - GpuColumnVector.from(cudf.Table.concatenate(tbl:_*), dataTypes) + withResource(cudf.Table.concatenate(tbl: _*)) { concatenated => + GpuColumnVector.from(concatenated, dataTypes) + } } } } From d4807c100f8c62c28d8c19c7897d588551917651 Mon Sep 17 00:00:00 2001 From: Alessandro Bellina Date: Tue, 14 Sep 2021 15:15:23 -0500 Subject: [PATCH 13/13] Remove the M2 aggregate implementation --- .../spark/sql/rapids/AggregateFunctions.scala | 156 ------------------ 1 file changed, 156 deletions(-) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala index 4cebeeb4d99..3378c49a5d0 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala @@ -418,68 +418,6 @@ class CudfLastExcludeNulls(ref: Expression) extends CudfFirstLastBase(ref) { override val offset: Int = -1 } -/** This is only used by the M2 class aggregates, do not confuse this with GpuAverage */ -class CudfMean(ref: Expression) extends CudfAggregate(ref) { - @transient val rapidsAvgType: DType = GpuColumnVector.getNonNestedRapidsType(dataType) - - override lazy val updateReductionAggregateInternal: cudf.ColumnVector => cudf.Scalar = - throw new UnsupportedOperationException("Reduction averages not supported") - - override lazy val mergeReductionAggregateInternal: cudf.ColumnVector => cudf.Scalar = - throw new UnsupportedOperationException("Reduction averages not supported") - - override lazy val updateAggregate: GroupByAggregationOnColumn = - GroupByAggregation.mean() - .onColumn(getOrdinal(ref)) - - override lazy val mergeAggregate: GroupByAggregationOnColumn = - GroupByAggregation.mean() - .onColumn(getOrdinal(ref)) - - override def toString(): String = "CudfMeanForM2" -} - -class CudfMergeM2(ref: Expression) - extends CudfAggregate(ref) { - override lazy val updateReductionAggregateInternal: cudf.ColumnVector => cudf.Scalar = - throw new UnsupportedOperationException("M2 aggregation is not yet supported in reduction") - override lazy val mergeReductionAggregateInternal: cudf.ColumnVector => cudf.Scalar = - throw new UnsupportedOperationException("M2 aggregation is not yet supported in reduction") - override lazy val updateAggregate: GroupByAggregationOnColumn = - throw new UnsupportedOperationException("Only merge is supported for CudfMergeM2") - - override lazy val mergeAggregate: GroupByAggregationOnColumn = - GroupByAggregation.mergeM2() - .onColumn(getOrdinal(ref)) - - override def toString(): String = "CudfMergeM2" - - override def dataType: DataType = - StructType( - StructField("n", IntegerType, nullable = true) :: - StructField("avg", DoubleType, nullable = true) :: - StructField("m2", DoubleType, nullable = true) :: Nil) - - override def nullable: Boolean = true -} - -class CudfM2(ref: Expression) extends CudfAggregate(ref) { - override lazy val updateReductionAggregateInternal: cudf.ColumnVector => cudf.Scalar = - throw new UnsupportedOperationException("M2 aggregation is not yet supported in reduction") - override lazy val mergeReductionAggregateInternal: cudf.ColumnVector => cudf.Scalar = - throw new UnsupportedOperationException("M2 aggregation is not yet supported in reduction") - override lazy val updateAggregate: GroupByAggregationOnColumn = - GroupByAggregation.M2() - .onColumn(getOrdinal(ref)) - override lazy val mergeAggregate: GroupByAggregationOnColumn = - GroupByAggregation.mergeM2() - .onColumn(getOrdinal(ref)) - override def toString(): String = "CudfM2" - - override def dataType: DataType = DoubleType - override def nullable: Boolean = true -} - case class GpuMin(child: Expression) extends GpuAggregateFunction with GpuBatchedRunningWindowWithFixer with GpuAggregateWindowFunction @@ -1108,97 +1046,3 @@ case class GpuToCpuCollectBufferTransition( projection.apply(InternalRow.apply(arrayData)).getBytes } } - -abstract class GpuM2(child: Expression) - extends GpuAggregateFunction with ImplicitCastInputTypes with Serializable { - - override def dataType: DataType = DoubleType - override def nullable: Boolean = true - override def inputTypes: Seq[AbstractDataType] = Seq(NumericType) - - // Buffers for the update stage. - protected lazy val bufferN: AttributeReference = - AttributeReference("n", DoubleType, nullable = false)() - - protected lazy val bufferAvg: AttributeReference = - AttributeReference("avg", DoubleType, nullable = true)() - - protected lazy val bufferM2: AttributeReference = - AttributeReference("m2", DoubleType, nullable = true)() - - override lazy val aggBufferAttributes: Seq[AttributeReference] = - bufferN :: bufferAvg :: bufferM2 :: Nil - - override lazy val inputProjection: Seq[Expression] = Seq(child, child, child) - - override lazy val initialValues: Seq[GpuLiteral] = - Seq(GpuLiteral(0.0), GpuLiteral(0.0), GpuLiteral(0.0)) - - // For local update, we need to compute all 3 aggregates: count, sum, and M2. - override lazy val updateExpressions: Seq[Expression] = - new CudfCount(bufferN) :: - new CudfMean(bufferAvg) :: - new CudfM2(bufferM2) :: Nil - - override lazy val postUpdate: Seq[Expression] = { - // we copy the `bufferN` attribute and stomp on the type as Integer here, - // because we really do have an int, and the `DoubleType` is what we want to output - // to match Spark. So this expression says we are going from the aggregated count, - // and we are casting it double so that a final aggregate that is on the CPU can use it, - // TODO: we need to make bufferCount (n) a Long at least in cuDF - val bufferCountAsInt = bufferN.copy(dataType = IntegerType)( - bufferN.exprId, bufferN.qualifier) - GpuCast(bufferCountAsInt, DoubleType) :: - bufferAvg :: - bufferM2 :: Nil - } - - // before we merge we have 3 columns, we have to turn them into a struct - // this is a prior 3-column result of MERGE_M2 that was exploded out into 3 columns - // and reconstitutes a struct, which is necessary for MERGE_M2 - // - // We cast `n` to be an Integer, as that's what MERGE_M2 expects, note that Spark - // keeps `n` as a Double. - override lazy val preMerge: Seq[Expression] = { - val childrenWithNames = - GpuLiteral("n", StringType) :: GpuCast(bufferN, IntegerType) :: - GpuLiteral("avg", StringType) :: bufferAvg :: - GpuLiteral("m2", StringType) :: bufferM2 :: Nil - GpuCreateNamedStruct(childrenWithNames) :: Nil - } - - def mergeM2DataType: DataType = - StructType( - StructField("n", IntegerType, nullable = true) :: - StructField("avg", DoubleType, nullable = true) :: - StructField("m2", DoubleType, nullable = true) :: Nil) - - override def mergeBufferAttributes: Seq[AttributeReference] = postMergeAttr - - private val m2Struct = - AttributeReference("m2struct", mergeM2DataType, nullable = true)() - - override lazy val mergeExpressions: Seq[Expression] = new CudfMergeM2(m2Struct) :: Nil - - // after a MERGE_M2 call in cudf, our result is 1 struct column - // we create this attribute to represent this result column - override lazy val postMergeAttr: Seq[AttributeReference] = Seq(m2Struct) - - // we will bind this expression against the attribute above in post-merge, - // and then project. The result will be 3 columns, where the first one is - // casted to Double to match Spark. - override lazy val postMerge: Seq[Expression] = Seq( - GpuCast(GpuGetStructField(m2Struct, 0), DoubleType), - GpuCast(GpuGetStructField(m2Struct, 1), DoubleType), - GpuCast(GpuGetStructField(m2Struct, 2), DoubleType)) -} - -case class GpuStddevPop(child: Expression) extends GpuM2(child) { - override lazy val evaluateExpression: Expression = { - // Compute stddev_pop from M2s: stddev_pop = sqrt(M2 / n). - GpuSqrt(GpuDivide(bufferM2, bufferN, failOnErrorOverride = false)) - } - - override def children: Seq[Expression] = Seq(child) - override def prettyName: String = "stddev_pop" -}