From 04e6aaf4f4a29cc85ed078516e314b4ded020f6b Mon Sep 17 00:00:00 2001 From: sperlingxx Date: Mon, 9 Aug 2021 16:27:17 +0800 Subject: [PATCH 1/6] fix Signed-off-by: sperlingxx --- .../src/main/python/hash_aggregate_test.py | 13 +++++++++++-- .../scala/com/nvidia/spark/rapids/aggregate.scala | 14 +++++++------- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index 56a593b901d..7c5688b4206 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -452,7 +452,16 @@ def test_hash_groupby_collect_with_single_distinct(data_gen): @incompat @pytest.mark.parametrize('data_gen', _gen_data_for_collect_op, ids=idfn) def test_hash_groupby_single_distinct_collect(data_gen): - # test distinct collect with other aggregations + # test distinct collect + sql = """select a, + sort_array(collect_list(distinct b)), + sort_array(collect_set(distinct b)) + from tbl group by a""" + assert_gpu_and_cpu_are_equal_sql( + df_fun=lambda spark: gen_df(spark, data_gen, length=100), + table_name="tbl", sql=sql) + + # test distinct collect with nonDistinct aggregations sql = """select a, sort_array(collect_list(distinct b)), sort_array(collect_set(b)), @@ -471,7 +480,7 @@ def test_hash_groupby_single_distinct_collect(data_gen): @approximate_float @ignore_order(local=True) @allow_non_gpu('SortAggregateExec', - 'SortArray', 'Alias', 'Literal', 'First', 'If', 'EqualTo', 'Count', + 'SortArray', 'Alias', 'Literal', 'First', 'If', 'EqualTo', 'Count', 'Coalesce', 'CollectList', 'CollectSet', 'AggregateExpression') @incompat @pytest.mark.parametrize('data_gen', _gen_data_for_collect_op, ids=idfn) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala index 25bde0b259d..00b98101db4 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala @@ -633,10 +633,10 @@ class GpuHashAggregateIterator( // - Final or PartialMerge-only mode: we pick the columns in the order as handed to us. // - Partial or Complete mode: we use the inputProjections val boundInputReferences = - if (modeInfo.hasPartialMergeMode && modeInfo.hasPartialMode) { - // The 3rd stage of AggWithOneDistinct, which combines (partial) reduce-side - // nonDistinctAggExpressions and map-side distinctAggExpressions. For this stage, we need to - // switch the position of distinctAttributes and nonDistinctAttributes. + if (modeInfo.uniqueModes.length > 1 && aggregateExpressions.exists(_.isDistinct)) { + // The 3rd stage of AggWithOneDistinct, which consists of nonDistinctAggExpressions for merge + // and distinctAggExpressions for update. For this stage, we need to switch the position of + // distinctAttributes and nonDistinctAttributes. // // The schema of the 2nd stage's outputs: // groupingAttributes ++ distinctAttributes ++ nonDistinctAggBufferAttributes @@ -667,7 +667,8 @@ class GpuHashAggregateIterator( val inputAttributes = groupingAttributes ++ distinctAttributes ++ nonDistinctAttributes GpuBindReferences.bindGpuReferences(inputProjections, inputAttributes) } else if (modeInfo.hasFinalMode || - (modeInfo.hasPartialMergeMode && modeInfo.uniqueModes.length == 1)) { + (modeInfo.hasPartialMergeMode && modeInfo.uniqueModes.length == 1) || + modeInfo.uniqueModes.isEmpty) { // two possible conditions: // 1. The Final stage, including the 2nd stage of NoDistinctAgg and 4th stage of // AggWithOneDistinct, which needs no input projections. Because the child outputs are @@ -676,8 +677,7 @@ class GpuHashAggregateIterator( // 2. The 2nd stage (PartialMerge) of AggWithOneDistinct, which works like the final stage // taking the child outputs as inputs without any projections. GpuBindReferences.bindGpuReferences(childAttr.attrs.asInstanceOf[Seq[Expression]], childAttr) - } else if (modeInfo.hasPartialMode || modeInfo.hasCompleteMode || - modeInfo.uniqueModes.isEmpty) { + } else if (modeInfo.hasPartialMode || modeInfo.hasCompleteMode) { // The first aggregation stage (including Partial or Complete or no aggExpression), // whose child node is not an AggregateExec. Therefore, input projections are essential. val inputProjections: Seq[Expression] = groupingExpressions ++ aggregateExpressions From c22e36d1f69af79fb13e3f12641dcf64b00daa6b Mon Sep 17 00:00:00 2001 From: sperlingxx Date: Mon, 9 Aug 2021 19:14:42 +0800 Subject: [PATCH 2/6] update comments Signed-off-by: sperlingxx --- .../com/nvidia/spark/rapids/aggregate.scala | 64 ++++++++++++++----- 1 file changed, 47 insertions(+), 17 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala index 00b98101db4..7596e551104 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala @@ -628,21 +628,43 @@ class GpuHashAggregateIterator( // boundInputReferences is used to pick out of the input batch the appropriate columns // for aggregation. // - // - PartialMerge with Partial mode: we use the inputProjections - // for Partial and non distinct merge expressions for PartialMerge. - // - Final or PartialMerge-only mode: we pick the columns in the order as handed to us. - // - Partial or Complete mode: we use the inputProjections + // - DistinctAggExpressions with nonDistinctAggExpressions in other mode: we switch the + // position of distinctAttributes and nonDistinctAttributes in childAttr. And we use the + // inputProjections for nonDistinctAggExpressions. + // - Final mode, PartialMerge-only mode or no AggExpressions: we pick the columns in the order + // as handed to us. + // - Partial mode or Complete mode: we use the inputProjections. val boundInputReferences = if (modeInfo.uniqueModes.length > 1 && aggregateExpressions.exists(_.isDistinct)) { - // The 3rd stage of AggWithOneDistinct, which consists of nonDistinctAggExpressions for merge - // and distinctAggExpressions for update. For this stage, we need to switch the position of - // distinctAttributes and nonDistinctAttributes. + // This block takes care of AggregateExec which contains nonDistinctAggExpressions and + // distinctAggExpressions with different AggregateModes. All nonDistinctAggExpressions share + // one mode and all distinctAggExpressions are in another mode. The specific mode varies in + // different Spark runtimes, so this block applies a general condition to adapt different + // runtimes: // - // The schema of the 2nd stage's outputs: - // groupingAttributes ++ distinctAttributes ++ nonDistinctAggBufferAttributes + // 1. Apache Spark: The 3rd stage of AggWithOneDistinct + // The 3rd stage of AggWithOneDistinct, which consists of for nonDistinctAggExpressions in + // PartialMerge mode and distinctAggExpressions in Partial mode. For this stage, we need to + // switch the position of distinctAttributes and nonDistinctAttributes if there exists at + // least one nonDistinctAggExpression. Because the positions of distinctAttributes are ahead + // of nonDistinctAttributes in the output of previous stage, since distinctAttributes are + // included in groupExpressions. + // To be specific, the schema of the 2nd stage's outputs is: + // (groupingAttributes ++ distinctAttributes) ++ nonDistinctAggBufferAttributes + // The schema of the 3rd stage's expressions is: + // groupingAttributes ++ nonDistinctAggExpressions(PartialMerge) ++ + // distinctAggExpressions(Partial) // - // The schema of the 3rd stage's expressions: - // nonDistinctMergeAggExpressions ++ distinctPartialAggExpressions + // 2. Databricks runtime: The final stage of AggWithOneDistinct + // Databricks runtime squeezes the 4-stage AggWithOneDistinct into 2 stages. Basically, it + // combines the 1st and 2nd stage into a "Partial" stage; and it combines the 3nd and 4th + // stage into a "Merge" stage. Similarly, nonDistinctAggExpressions are ahead of distinct + // ones in the layout of "Merge" stage's expressions: + // groupingAttributes ++ nonDistinctAggExpressions(Final) ++ DistinctAggExpressions(Complete) + // Meanwhile, as Apache Spark, distinctAttributes are ahead of nonDistinctAggBufferAttributes + // in the output schema of the "Partial" stage. + // Therefore, this block also works on the final stage of AggWithOneDistinct under Databricks + // runtime. val (distinctAggExpressions, nonDistinctAggExpressions) = aggregateExpressions.partition( _.isDistinct) @@ -656,10 +678,11 @@ class GpuHashAggregateIterator( val distinctAttributes = childAttr.attrs.slice( groupingAttributes.length, childAttr.attrs.length - sizeOfNonDistAttr) - // With PartialMerge modes, we just pass through corresponding attributes of child plan into - // nonDistinctExpressions. + // For nonDistinctExpressions, they are in either PartialMerge or Final modes. With either + // mode, we just need to pass through childAttr. val nonDistinctExpressions = nonDistinctAttributes.asInstanceOf[Seq[Expression]] - // With Partial modes, the input projections are necessary for distinctExpressions. + // For nonDistinctExpressions, they are in either Final or Complete modes. With either mode, + // we need to apply the input projections on these AggExpressions. val distinctExpressions = distinctAggExpressions.flatMap(_.aggregateFunction.inputProjection) // Align the expressions of input projections and input attributes @@ -669,17 +692,24 @@ class GpuHashAggregateIterator( } else if (modeInfo.hasFinalMode || (modeInfo.hasPartialMergeMode && modeInfo.uniqueModes.length == 1) || modeInfo.uniqueModes.isEmpty) { - // two possible conditions: + // This block takes care of three possible conditions: // 1. The Final stage, including the 2nd stage of NoDistinctAgg and 4th stage of // AggWithOneDistinct, which needs no input projections. Because the child outputs are // internal aggregation buffers, which are aligned for the final stage. // // 2. The 2nd stage (PartialMerge) of AggWithOneDistinct, which works like the final stage // taking the child outputs as inputs without any projections. + // + // 3. Stages without any AggExpressions (only contain groupingExpressions) GpuBindReferences.bindGpuReferences(childAttr.attrs.asInstanceOf[Seq[Expression]], childAttr) } else if (modeInfo.hasPartialMode || modeInfo.hasCompleteMode) { - // The first aggregation stage (including Partial or Complete or no aggExpression), - // whose child node is not an AggregateExec. Therefore, input projections are essential. + // The first aggregation stage which contains AggExpressions (in either Partial or Complete + // mode). In this case, the input projections are essential. + // To be specific, there are three conditions matching this case: + // 1. The Partial (1st) stage of NoDistinctAgg + // 2. The Partial (1st) stage of AggWithOneDistinct + // 3. In Databricks runtime, the "Final" (2nd) stage of AggWithOneDistinct which only contains + // DistinctAggExpressions (without any nonDistinctAggExpressions) val inputProjections: Seq[Expression] = groupingExpressions ++ aggregateExpressions .flatMap(_.aggregateFunction.inputProjection) GpuBindReferences.bindGpuReferences(inputProjections, childAttr) From 29db6d69f96bb7be48e8a30802e4855e830abb6c Mon Sep 17 00:00:00 2001 From: sperlingxx Date: Mon, 9 Aug 2021 20:02:22 +0800 Subject: [PATCH 3/6] update Signed-off-by: sperlingxx --- .../com/nvidia/spark/rapids/aggregate.scala | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala index 7596e551104..b342bc9fe3e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala @@ -690,28 +690,27 @@ class GpuHashAggregateIterator( val inputAttributes = groupingAttributes ++ distinctAttributes ++ nonDistinctAttributes GpuBindReferences.bindGpuReferences(inputProjections, inputAttributes) } else if (modeInfo.hasFinalMode || - (modeInfo.hasPartialMergeMode && modeInfo.uniqueModes.length == 1) || - modeInfo.uniqueModes.isEmpty) { - // This block takes care of three possible conditions: + (modeInfo.hasPartialMergeMode && modeInfo.uniqueModes.length == 1)) { + // This block takes care of two possible conditions: // 1. The Final stage, including the 2nd stage of NoDistinctAgg and 4th stage of // AggWithOneDistinct, which needs no input projections. Because the child outputs are // internal aggregation buffers, which are aligned for the final stage. - // // 2. The 2nd stage (PartialMerge) of AggWithOneDistinct, which works like the final stage // taking the child outputs as inputs without any projections. - // - // 3. Stages without any AggExpressions (only contain groupingExpressions) GpuBindReferences.bindGpuReferences(childAttr.attrs.asInstanceOf[Seq[Expression]], childAttr) - } else if (modeInfo.hasPartialMode || modeInfo.hasCompleteMode) { + } else if (modeInfo.hasPartialMode || modeInfo.hasCompleteMode || + modeInfo.uniqueModes.isEmpty) { // The first aggregation stage which contains AggExpressions (in either Partial or Complete // mode). In this case, the input projections are essential. - // To be specific, there are three conditions matching this case: + // To be specific, there are four conditions matching this case: // 1. The Partial (1st) stage of NoDistinctAgg // 2. The Partial (1st) stage of AggWithOneDistinct // 3. In Databricks runtime, the "Final" (2nd) stage of AggWithOneDistinct which only contains // DistinctAggExpressions (without any nonDistinctAggExpressions) + // + // In addition, this block also fits for aggregation stages without any AggExpressions. val inputProjections: Seq[Expression] = groupingExpressions ++ aggregateExpressions - .flatMap(_.aggregateFunction.inputProjection) + .flatMap(_.aggregateFunction.inputProjection) GpuBindReferences.bindGpuReferences(inputProjections, childAttr) } else { // This branch should NOT be reached. From 165812591649d28b28575ed96e08d60ab83bf409 Mon Sep 17 00:00:00 2001 From: sperlingxx Date: Wed, 11 Aug 2021 14:46:34 +0800 Subject: [PATCH 4/6] fix scala style Signed-off-by: sperlingxx --- .../com/nvidia/spark/rapids/aggregate.scala | 89 ++++++++++--------- 1 file changed, 48 insertions(+), 41 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala index b342bc9fe3e..0c738005d5b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala @@ -79,9 +79,10 @@ object AggregateUtils { /** * Computes a target input batch size based on the assumption that computation can consume up to * 4X the configured batch size. - * @param confTargetSize user-configured maximum desired batch size - * @param inputTypes input batch schema - * @param outputTypes output batch schema + * + * @param confTargetSize user-configured maximum desired batch size + * @param inputTypes input batch schema + * @param outputTypes output batch schema * @param isReductionOnly true if this is a reduction-only aggregation without grouping * @return maximum target batch size to keep computation under the 4X configured batch limit */ @@ -92,6 +93,7 @@ object AggregateUtils { isReductionOnly: Boolean): Long = { def typesToSize(types: Seq[DataType]): Long = types.map(GpuBatchUtils.estimateGpuMemory(_, nullable = false, rowCount = 1)).sum + val inputRowSize = typesToSize(inputTypes) val outputRowSize = typesToSize(outputTypes) // The cudf hash table implementation allocates four 32-bit integers per input row. @@ -120,7 +122,8 @@ object AggregateUtils { /** * Compute the aggregation modes and aggregate expressions for all aggregation expressions - * @param aggExpressions the aggregate expressions + * + * @param aggExpressions the aggregate expressions * @param aggBufferAttributes attributes to be bound to the aggregate expressions */ def computeAggModeCudfAggregates( @@ -196,14 +199,14 @@ object AggregateModeInfo { * `buildSortFallbackIterator` is used to sort the aggregated batches by the grouping keys and * performs a final merge aggregation pass on the sorted batches. * - * @param cbIter iterator providing the nput columnar batches - * @param groupingExpressions expressions used for producing the grouping keys - * @param aggregateExpressions GPU aggregate expressions used to produce the aggregations - * @param aggregateAttributes attribute references to each aggregate expression - * @param resultExpressions output expression for the aggregation - * @param childOutput input attributes to identify the input columns from the input batches - * @param modeInfo identifies which aggregation modes are being used - * @param metrics metrics that will be updated during aggregation + * @param cbIter iterator providing the nput columnar batches + * @param groupingExpressions expressions used for producing the grouping keys + * @param aggregateExpressions GPU aggregate expressions used to produce the aggregations + * @param aggregateAttributes attribute references to each aggregate expression + * @param resultExpressions output expression for the aggregation + * @param childOutput input attributes to identify the input columns from the input batches + * @param modeInfo identifies which aggregation modes are being used + * @param metrics metrics that will be updated during aggregation * @param configuredTargetBatchSize user-specified value for the targeted input batch size */ class GpuHashAggregateIterator( @@ -305,7 +308,7 @@ class GpuHashAggregateIterator( private def computeTargetMergeBatchSize(confTargetSize: Long): Long = { val aggregates = boundExpressions.aggModeCudfAggregates.flatMap(_._2) val mergedTypes = groupingExpressions.map(_.dataType) ++ aggregates.map(_.dataType) - AggregateUtils.computeTargetBatchSize(confTargetSize, mergedTypes, mergedTypes,isReductionOnly) + AggregateUtils.computeTargetBatchSize(confTargetSize, mergedTypes, mergedTypes, isReductionOnly) } /** Aggregate all input batches and place the results in the aggregatedBatches queue. */ @@ -365,6 +368,7 @@ class GpuHashAggregateIterator( /** * Perform a single pass over the aggregated batches attempting to merge adjacent batches. + * * @return true if at least one merge operation occurred */ private def mergePass(): Boolean = { @@ -418,6 +422,7 @@ class GpuHashAggregateIterator( /** * Concatenate batches together and perform a merge aggregation on the result. The input batches * will be closed as part of this operation. + * * @param batches batches to concatenate and merge aggregate * @return lazy spillable batch which has NOT been marked spillable */ @@ -526,8 +531,8 @@ class GpuHashAggregateIterator( /** * Project a merged aggregated batch result to the layout that Spark expects * i.e.: select avg(foo) from bar group by baz will produce: - * Partial mode: 3 columns => [bar, sum(foo) as sum_foo, count(foo) as count_foo] - * Final mode: 2 columns => [bar, sum(sum_foo) / sum(count_foo)] + * Partial mode: 3 columns => [bar, sum(foo) as sum_foo, count(foo) as count_foo] + * Final mode: 2 columns => [bar, sum(sum_foo) / sum(count_foo)] */ private def finalProjectBatch(batch: ColumnarBatch): ColumnarBatch = { val aggTime = metrics.computeAggTime @@ -584,6 +589,7 @@ class GpuHashAggregateIterator( /** * Concatenates batches by concatenating the corresponding column vectors within the batches. + * * @note the input batches are not closed as part of this operation * @param batchesToConcat batches to concatenate * @return concatenated vectors that together represent the concatenated batch result @@ -690,7 +696,7 @@ class GpuHashAggregateIterator( val inputAttributes = groupingAttributes ++ distinctAttributes ++ nonDistinctAttributes GpuBindReferences.bindGpuReferences(inputProjections, inputAttributes) } else if (modeInfo.hasFinalMode || - (modeInfo.hasPartialMergeMode && modeInfo.uniqueModes.length == 1)) { + (modeInfo.hasPartialMergeMode && modeInfo.uniqueModes.length == 1)) { // This block takes care of two possible conditions: // 1. The Final stage, including the 2nd stage of NoDistinctAgg and 4th stage of // AggWithOneDistinct, which needs no input projections. Because the child outputs are @@ -699,7 +705,7 @@ class GpuHashAggregateIterator( // taking the child outputs as inputs without any projections. GpuBindReferences.bindGpuReferences(childAttr.attrs.asInstanceOf[Seq[Expression]], childAttr) } else if (modeInfo.hasPartialMode || modeInfo.hasCompleteMode || - modeInfo.uniqueModes.isEmpty) { + modeInfo.uniqueModes.isEmpty) { // The first aggregation stage which contains AggExpressions (in either Partial or Complete // mode). In this case, the input projections are essential. // To be specific, there are four conditions matching this case: @@ -710,7 +716,7 @@ class GpuHashAggregateIterator( // // In addition, this block also fits for aggregation stages without any AggExpressions. val inputProjections: Seq[Expression] = groupingExpressions ++ aggregateExpressions - .flatMap(_.aggregateFunction.inputProjection) + .flatMap(_.aggregateFunction.inputProjection) GpuBindReferences.bindGpuReferences(inputProjections, childAttr) } else { // This branch should NOT be reached. @@ -755,15 +761,16 @@ class GpuHashAggregateIterator( /** * Compute the aggregations on the projected input columns. + * * @param toAggregateCvs column vectors representing the input batch to aggregate - * @param merge true indicates a merge aggregation should be performed - * @param isSorted true indicates the data is already sorted by the grouping keys + * @param merge true indicates a merge aggregation should be performed + * @param isSorted true indicates the data is already sorted by the grouping keys * @return aggregated batch */ private def computeAggregate( toAggregateCvs: Seq[GpuColumnVector], merge: Boolean, - isSorted: Boolean = false): ColumnarBatch = { + isSorted: Boolean = false): ColumnarBatch = { val aggModeCudfAggregates = boundExpressions.aggModeCudfAggregates val computeAggTime = metrics.computeAggTime withResource(new NvtxWithMetrics("computeAggregate", NvtxColor.CYAN, computeAggTime)) { _ => @@ -893,12 +900,12 @@ abstract class GpuBaseAggregateMeta[INPUT <: SparkPlan]( override def tagPlanForGpu(): Unit = { agg.groupingExpressions - .find(_.dataType match { - case _@(ArrayType(_, _) | MapType(_, _, _)) | _@StructType(_) => true - case _ => false - }) - .foreach(_ => - willNotWorkOnGpu("Nested types in grouping expressions are not supported")) + .find(_.dataType match { + case _@(ArrayType(_, _) | MapType(_, _, _)) | _@StructType(_) => true + case _ => false + }) + .foreach(_ => + willNotWorkOnGpu("Nested types in grouping expressions are not supported")) if (agg.resultExpressions.isEmpty) { willNotWorkOnGpu("result expressions is empty") } @@ -1214,14 +1221,14 @@ class GpuObjectHashAggregateExecMeta( * * @param requiredChildDistributionExpressions this is unchanged by the GPU. It is used in * EnsureRequirements to be able to add shuffle nodes - * @param groupingExpressions The expressions that, when applied to the input batch, return the - * grouping key - * @param aggregateExpressions The GpuAggregateExpression instances for this node - * @param aggregateAttributes References to each GpuAggregateExpression (attribute references) - * @param resultExpressions the expected output expression of this hash aggregate (which this - * node should project) - * @param child incoming plan (where we get input columns from) - * @param configuredTargetBatchSize user-configured maximum device memory size of a batch + * @param groupingExpressions The expressions that, when applied to the input batch, return the + * grouping key + * @param aggregateExpressions The GpuAggregateExpression instances for this node + * @param aggregateAttributes References to each GpuAggregateExpression (attribute references) + * @param resultExpressions the expected output expression of this hash aggregate (which this + * node should project) + * @param child incoming plan (where we get input columns from) + * @param configuredTargetBatchSize user-configured maximum device memory size of a batch */ case class GpuHashAggregateExec( requiredChildDistributionExpressions: Option[Seq[Expression]], @@ -1238,7 +1245,7 @@ case class GpuHashAggregateExec( override lazy val additionalMetrics: Map[String, GpuMetric] = Map( NUM_TASKS_FALL_BACKED -> createMetric(MODERATE_LEVEL, DESCRIPTION_NUM_TASKS_FALL_BACKED), AGG_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_AGG_TIME), - CONCAT_TIME-> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_CONCAT_TIME), + CONCAT_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_CONCAT_TIME), SORT_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_SORT_TIME) ) ++ spillMetrics @@ -1319,7 +1326,7 @@ case class GpuHashAggregateExec( protected def replaceAlias(attr: AttributeReference): Option[Attribute] = { outputExpressions.collectFirst { - case a @ Alias(child: AttributeReference, _) if child.semanticEquals(attr) => + case a@Alias(child: AttributeReference, _) if child.semanticEquals(attr) => a.toAttribute } } @@ -1327,8 +1334,8 @@ case class GpuHashAggregateExec( // Used in de-duping and optimizer rules override def producedAttributes: AttributeSet = AttributeSet(aggregateAttributes) ++ - AttributeSet(resultExpressions.diff(groupingExpressions).map(_.toAttribute)) ++ - AttributeSet(aggregateBufferAttributes) + AttributeSet(resultExpressions.diff(groupingExpressions).map(_.toAttribute)) ++ + AttributeSet(aggregateBufferAttributes) // AllTuples = distribution with a single partition and all tuples of the dataset are co-located. // Clustered = dataset with tuples co-located in the same partition if they share a specific value @@ -1351,7 +1358,7 @@ case class GpuHashAggregateExec( */ override lazy val allAttributes: AttributeSeq = child.output ++ aggregateBufferAttributes ++ aggregateAttributes ++ - aggregateExpressions.flatMap(_.aggregateFunction.inputAggBufferAttributes) + aggregateExpressions.flatMap(_.aggregateFunction.inputAggBufferAttributes) override def verboseString(maxFields: Int): String = toString(verbose = true, maxFields) @@ -1369,7 +1376,7 @@ case class GpuHashAggregateExec( s"GpuHashAggregate(keys=$keyString, functions=$functionString, output=$outputString)" } else { s"GpuHashAggregate(keys=$keyString, functions=$functionString)," + - s" filters=${aggregateExpressions.map(_.filter)})" + s" filters=${aggregateExpressions.map(_.filter)})" } } // From 875d7a3cc546aa318b00c29130918b40ddc26705 Mon Sep 17 00:00:00 2001 From: sperlingxx Date: Wed, 11 Aug 2021 15:09:50 +0800 Subject: [PATCH 5/6] Revert "fix scala style" This reverts commit 165812591649d28b28575ed96e08d60ab83bf409. --- .../com/nvidia/spark/rapids/aggregate.scala | 89 +++++++++---------- 1 file changed, 41 insertions(+), 48 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala index 0c738005d5b..b342bc9fe3e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala @@ -79,10 +79,9 @@ object AggregateUtils { /** * Computes a target input batch size based on the assumption that computation can consume up to * 4X the configured batch size. - * - * @param confTargetSize user-configured maximum desired batch size - * @param inputTypes input batch schema - * @param outputTypes output batch schema + * @param confTargetSize user-configured maximum desired batch size + * @param inputTypes input batch schema + * @param outputTypes output batch schema * @param isReductionOnly true if this is a reduction-only aggregation without grouping * @return maximum target batch size to keep computation under the 4X configured batch limit */ @@ -93,7 +92,6 @@ object AggregateUtils { isReductionOnly: Boolean): Long = { def typesToSize(types: Seq[DataType]): Long = types.map(GpuBatchUtils.estimateGpuMemory(_, nullable = false, rowCount = 1)).sum - val inputRowSize = typesToSize(inputTypes) val outputRowSize = typesToSize(outputTypes) // The cudf hash table implementation allocates four 32-bit integers per input row. @@ -122,8 +120,7 @@ object AggregateUtils { /** * Compute the aggregation modes and aggregate expressions for all aggregation expressions - * - * @param aggExpressions the aggregate expressions + * @param aggExpressions the aggregate expressions * @param aggBufferAttributes attributes to be bound to the aggregate expressions */ def computeAggModeCudfAggregates( @@ -199,14 +196,14 @@ object AggregateModeInfo { * `buildSortFallbackIterator` is used to sort the aggregated batches by the grouping keys and * performs a final merge aggregation pass on the sorted batches. * - * @param cbIter iterator providing the nput columnar batches - * @param groupingExpressions expressions used for producing the grouping keys - * @param aggregateExpressions GPU aggregate expressions used to produce the aggregations - * @param aggregateAttributes attribute references to each aggregate expression - * @param resultExpressions output expression for the aggregation - * @param childOutput input attributes to identify the input columns from the input batches - * @param modeInfo identifies which aggregation modes are being used - * @param metrics metrics that will be updated during aggregation + * @param cbIter iterator providing the nput columnar batches + * @param groupingExpressions expressions used for producing the grouping keys + * @param aggregateExpressions GPU aggregate expressions used to produce the aggregations + * @param aggregateAttributes attribute references to each aggregate expression + * @param resultExpressions output expression for the aggregation + * @param childOutput input attributes to identify the input columns from the input batches + * @param modeInfo identifies which aggregation modes are being used + * @param metrics metrics that will be updated during aggregation * @param configuredTargetBatchSize user-specified value for the targeted input batch size */ class GpuHashAggregateIterator( @@ -308,7 +305,7 @@ class GpuHashAggregateIterator( private def computeTargetMergeBatchSize(confTargetSize: Long): Long = { val aggregates = boundExpressions.aggModeCudfAggregates.flatMap(_._2) val mergedTypes = groupingExpressions.map(_.dataType) ++ aggregates.map(_.dataType) - AggregateUtils.computeTargetBatchSize(confTargetSize, mergedTypes, mergedTypes, isReductionOnly) + AggregateUtils.computeTargetBatchSize(confTargetSize, mergedTypes, mergedTypes,isReductionOnly) } /** Aggregate all input batches and place the results in the aggregatedBatches queue. */ @@ -368,7 +365,6 @@ class GpuHashAggregateIterator( /** * Perform a single pass over the aggregated batches attempting to merge adjacent batches. - * * @return true if at least one merge operation occurred */ private def mergePass(): Boolean = { @@ -422,7 +418,6 @@ class GpuHashAggregateIterator( /** * Concatenate batches together and perform a merge aggregation on the result. The input batches * will be closed as part of this operation. - * * @param batches batches to concatenate and merge aggregate * @return lazy spillable batch which has NOT been marked spillable */ @@ -531,8 +526,8 @@ class GpuHashAggregateIterator( /** * Project a merged aggregated batch result to the layout that Spark expects * i.e.: select avg(foo) from bar group by baz will produce: - * Partial mode: 3 columns => [bar, sum(foo) as sum_foo, count(foo) as count_foo] - * Final mode: 2 columns => [bar, sum(sum_foo) / sum(count_foo)] + * Partial mode: 3 columns => [bar, sum(foo) as sum_foo, count(foo) as count_foo] + * Final mode: 2 columns => [bar, sum(sum_foo) / sum(count_foo)] */ private def finalProjectBatch(batch: ColumnarBatch): ColumnarBatch = { val aggTime = metrics.computeAggTime @@ -589,7 +584,6 @@ class GpuHashAggregateIterator( /** * Concatenates batches by concatenating the corresponding column vectors within the batches. - * * @note the input batches are not closed as part of this operation * @param batchesToConcat batches to concatenate * @return concatenated vectors that together represent the concatenated batch result @@ -696,7 +690,7 @@ class GpuHashAggregateIterator( val inputAttributes = groupingAttributes ++ distinctAttributes ++ nonDistinctAttributes GpuBindReferences.bindGpuReferences(inputProjections, inputAttributes) } else if (modeInfo.hasFinalMode || - (modeInfo.hasPartialMergeMode && modeInfo.uniqueModes.length == 1)) { + (modeInfo.hasPartialMergeMode && modeInfo.uniqueModes.length == 1)) { // This block takes care of two possible conditions: // 1. The Final stage, including the 2nd stage of NoDistinctAgg and 4th stage of // AggWithOneDistinct, which needs no input projections. Because the child outputs are @@ -705,7 +699,7 @@ class GpuHashAggregateIterator( // taking the child outputs as inputs without any projections. GpuBindReferences.bindGpuReferences(childAttr.attrs.asInstanceOf[Seq[Expression]], childAttr) } else if (modeInfo.hasPartialMode || modeInfo.hasCompleteMode || - modeInfo.uniqueModes.isEmpty) { + modeInfo.uniqueModes.isEmpty) { // The first aggregation stage which contains AggExpressions (in either Partial or Complete // mode). In this case, the input projections are essential. // To be specific, there are four conditions matching this case: @@ -716,7 +710,7 @@ class GpuHashAggregateIterator( // // In addition, this block also fits for aggregation stages without any AggExpressions. val inputProjections: Seq[Expression] = groupingExpressions ++ aggregateExpressions - .flatMap(_.aggregateFunction.inputProjection) + .flatMap(_.aggregateFunction.inputProjection) GpuBindReferences.bindGpuReferences(inputProjections, childAttr) } else { // This branch should NOT be reached. @@ -761,16 +755,15 @@ class GpuHashAggregateIterator( /** * Compute the aggregations on the projected input columns. - * * @param toAggregateCvs column vectors representing the input batch to aggregate - * @param merge true indicates a merge aggregation should be performed - * @param isSorted true indicates the data is already sorted by the grouping keys + * @param merge true indicates a merge aggregation should be performed + * @param isSorted true indicates the data is already sorted by the grouping keys * @return aggregated batch */ private def computeAggregate( toAggregateCvs: Seq[GpuColumnVector], merge: Boolean, - isSorted: Boolean = false): ColumnarBatch = { + isSorted: Boolean = false): ColumnarBatch = { val aggModeCudfAggregates = boundExpressions.aggModeCudfAggregates val computeAggTime = metrics.computeAggTime withResource(new NvtxWithMetrics("computeAggregate", NvtxColor.CYAN, computeAggTime)) { _ => @@ -900,12 +893,12 @@ abstract class GpuBaseAggregateMeta[INPUT <: SparkPlan]( override def tagPlanForGpu(): Unit = { agg.groupingExpressions - .find(_.dataType match { - case _@(ArrayType(_, _) | MapType(_, _, _)) | _@StructType(_) => true - case _ => false - }) - .foreach(_ => - willNotWorkOnGpu("Nested types in grouping expressions are not supported")) + .find(_.dataType match { + case _@(ArrayType(_, _) | MapType(_, _, _)) | _@StructType(_) => true + case _ => false + }) + .foreach(_ => + willNotWorkOnGpu("Nested types in grouping expressions are not supported")) if (agg.resultExpressions.isEmpty) { willNotWorkOnGpu("result expressions is empty") } @@ -1221,14 +1214,14 @@ class GpuObjectHashAggregateExecMeta( * * @param requiredChildDistributionExpressions this is unchanged by the GPU. It is used in * EnsureRequirements to be able to add shuffle nodes - * @param groupingExpressions The expressions that, when applied to the input batch, return the - * grouping key - * @param aggregateExpressions The GpuAggregateExpression instances for this node - * @param aggregateAttributes References to each GpuAggregateExpression (attribute references) - * @param resultExpressions the expected output expression of this hash aggregate (which this - * node should project) - * @param child incoming plan (where we get input columns from) - * @param configuredTargetBatchSize user-configured maximum device memory size of a batch + * @param groupingExpressions The expressions that, when applied to the input batch, return the + * grouping key + * @param aggregateExpressions The GpuAggregateExpression instances for this node + * @param aggregateAttributes References to each GpuAggregateExpression (attribute references) + * @param resultExpressions the expected output expression of this hash aggregate (which this + * node should project) + * @param child incoming plan (where we get input columns from) + * @param configuredTargetBatchSize user-configured maximum device memory size of a batch */ case class GpuHashAggregateExec( requiredChildDistributionExpressions: Option[Seq[Expression]], @@ -1245,7 +1238,7 @@ case class GpuHashAggregateExec( override lazy val additionalMetrics: Map[String, GpuMetric] = Map( NUM_TASKS_FALL_BACKED -> createMetric(MODERATE_LEVEL, DESCRIPTION_NUM_TASKS_FALL_BACKED), AGG_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_AGG_TIME), - CONCAT_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_CONCAT_TIME), + CONCAT_TIME-> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_CONCAT_TIME), SORT_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_SORT_TIME) ) ++ spillMetrics @@ -1326,7 +1319,7 @@ case class GpuHashAggregateExec( protected def replaceAlias(attr: AttributeReference): Option[Attribute] = { outputExpressions.collectFirst { - case a@Alias(child: AttributeReference, _) if child.semanticEquals(attr) => + case a @ Alias(child: AttributeReference, _) if child.semanticEquals(attr) => a.toAttribute } } @@ -1334,8 +1327,8 @@ case class GpuHashAggregateExec( // Used in de-duping and optimizer rules override def producedAttributes: AttributeSet = AttributeSet(aggregateAttributes) ++ - AttributeSet(resultExpressions.diff(groupingExpressions).map(_.toAttribute)) ++ - AttributeSet(aggregateBufferAttributes) + AttributeSet(resultExpressions.diff(groupingExpressions).map(_.toAttribute)) ++ + AttributeSet(aggregateBufferAttributes) // AllTuples = distribution with a single partition and all tuples of the dataset are co-located. // Clustered = dataset with tuples co-located in the same partition if they share a specific value @@ -1358,7 +1351,7 @@ case class GpuHashAggregateExec( */ override lazy val allAttributes: AttributeSeq = child.output ++ aggregateBufferAttributes ++ aggregateAttributes ++ - aggregateExpressions.flatMap(_.aggregateFunction.inputAggBufferAttributes) + aggregateExpressions.flatMap(_.aggregateFunction.inputAggBufferAttributes) override def verboseString(maxFields: Int): String = toString(verbose = true, maxFields) @@ -1376,7 +1369,7 @@ case class GpuHashAggregateExec( s"GpuHashAggregate(keys=$keyString, functions=$functionString, output=$outputString)" } else { s"GpuHashAggregate(keys=$keyString, functions=$functionString)," + - s" filters=${aggregateExpressions.map(_.filter)})" + s" filters=${aggregateExpressions.map(_.filter)})" } } // From b2ea92682519d1b44eeda81b059c23631e2a84d2 Mon Sep 17 00:00:00 2001 From: sperlingxx Date: Wed, 11 Aug 2021 15:21:48 +0800 Subject: [PATCH 6/6] fix scala style Signed-off-by: sperlingxx --- .../scala/com/nvidia/spark/rapids/aggregate.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala index b342bc9fe3e..a7826f935fb 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala @@ -690,7 +690,7 @@ class GpuHashAggregateIterator( val inputAttributes = groupingAttributes ++ distinctAttributes ++ nonDistinctAttributes GpuBindReferences.bindGpuReferences(inputProjections, inputAttributes) } else if (modeInfo.hasFinalMode || - (modeInfo.hasPartialMergeMode && modeInfo.uniqueModes.length == 1)) { + (modeInfo.hasPartialMergeMode && modeInfo.uniqueModes.length == 1)) { // This block takes care of two possible conditions: // 1. The Final stage, including the 2nd stage of NoDistinctAgg and 4th stage of // AggWithOneDistinct, which needs no input projections. Because the child outputs are @@ -699,7 +699,7 @@ class GpuHashAggregateIterator( // taking the child outputs as inputs without any projections. GpuBindReferences.bindGpuReferences(childAttr.attrs.asInstanceOf[Seq[Expression]], childAttr) } else if (modeInfo.hasPartialMode || modeInfo.hasCompleteMode || - modeInfo.uniqueModes.isEmpty) { + modeInfo.uniqueModes.isEmpty) { // The first aggregation stage which contains AggExpressions (in either Partial or Complete // mode). In this case, the input projections are essential. // To be specific, there are four conditions matching this case: @@ -710,7 +710,7 @@ class GpuHashAggregateIterator( // // In addition, this block also fits for aggregation stages without any AggExpressions. val inputProjections: Seq[Expression] = groupingExpressions ++ aggregateExpressions - .flatMap(_.aggregateFunction.inputProjection) + .flatMap(_.aggregateFunction.inputProjection) GpuBindReferences.bindGpuReferences(inputProjections, childAttr) } else { // This branch should NOT be reached. @@ -1238,7 +1238,7 @@ case class GpuHashAggregateExec( override lazy val additionalMetrics: Map[String, GpuMetric] = Map( NUM_TASKS_FALL_BACKED -> createMetric(MODERATE_LEVEL, DESCRIPTION_NUM_TASKS_FALL_BACKED), AGG_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_AGG_TIME), - CONCAT_TIME-> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_CONCAT_TIME), + CONCAT_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_CONCAT_TIME), SORT_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_SORT_TIME) ) ++ spillMetrics @@ -1327,8 +1327,8 @@ case class GpuHashAggregateExec( // Used in de-duping and optimizer rules override def producedAttributes: AttributeSet = AttributeSet(aggregateAttributes) ++ - AttributeSet(resultExpressions.diff(groupingExpressions).map(_.toAttribute)) ++ - AttributeSet(aggregateBufferAttributes) + AttributeSet(resultExpressions.diff(groupingExpressions).map(_.toAttribute)) ++ + AttributeSet(aggregateBufferAttributes) // AllTuples = distribution with a single partition and all tuples of the dataset are co-located. // Clustered = dataset with tuples co-located in the same partition if they share a specific value @@ -1369,7 +1369,7 @@ case class GpuHashAggregateExec( s"GpuHashAggregate(keys=$keyString, functions=$functionString, output=$outputString)" } else { s"GpuHashAggregate(keys=$keyString, functions=$functionString)," + - s" filters=${aggregateExpressions.map(_.filter)})" + s" filters=${aggregateExpressions.map(_.filter)})" } } //