From 02ff24e0f7735147d717c818daa51fb290fc14e5 Mon Sep 17 00:00:00 2001 From: Nghia Truong <7416935+ttnghia@users.noreply.github.com> Date: Fri, 20 Oct 2023 09:36:32 -0700 Subject: [PATCH] Implement `percentile` aggregation (#9296) Signed-off-by: Nghia Truong --- .../advanced_configs.md | 1 + docs/supported_ops.md | 278 +++++++++++++---- .../src/main/python/hash_aggregate_test.py | 165 +++++++++++ .../nvidia/spark/rapids/GpuOverrides.scala | 75 ++++- .../sql/rapids/aggregate/GpuPercentile.scala | 280 ++++++++++++++++++ tools/generated_files/operatorsScore.csv | 1 + tools/generated_files/supportedExprs.csv | 8 + 7 files changed, 752 insertions(+), 56 deletions(-) create mode 100644 sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/GpuPercentile.scala diff --git a/docs/additional-functionality/advanced_configs.md b/docs/additional-functionality/advanced_configs.md index 883aab24cdb..0ce1a964ca7 100644 --- a/docs/additional-functionality/advanced_configs.md +++ b/docs/additional-functionality/advanced_configs.md @@ -383,6 +383,7 @@ Name | SQL Function(s) | Description | Default Value | Notes spark.rapids.sql.expression.Last|`last`, `last_value`|last aggregate operator|true|None| spark.rapids.sql.expression.Max|`max`|Max aggregate operator|true|None| spark.rapids.sql.expression.Min|`min`|Min aggregate operator|true|None| +spark.rapids.sql.expression.Percentile|`percentile`|Aggregation computing exact percentile|true|None| spark.rapids.sql.expression.PivotFirst| |PivotFirst operator|true|None| spark.rapids.sql.expression.StddevPop|`stddev_pop`|Aggregation computing population standard deviation|true|None| spark.rapids.sql.expression.StddevSamp|`stddev_samp`, `std`, `stddev`|Aggregation computing sample standard deviation|true|None| diff --git a/docs/supported_ops.md b/docs/supported_ops.md index 48949ab00ef..f0358163cc0 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -17629,6 +17629,180 @@ are limited. UDT +Percentile +`percentile` +Aggregation computing exact percentile +None +aggregation +input + +S +S +S +S +S +S + + + + + + + + + + + + + +percentage + + + + + + +PS
Literal value only
+ + + + + + + +S + + + + + +frequency + + + + +S + + + + + + + + + +S + + + + + +result + + + + + + +S + + + + + + + +S + + + + + +reduction +input + +S +S +S +S +S +S + + + + + + + + + + + + + +percentage + + + + + + +PS
Literal value only
+ + + + + + + +S + + + + + +frequency + + + + +S + + + + + + + + + +S + + + + + +result + + + + + + +S + + + + + + + +S + + + + + PivotFirst PivotFirst operator @@ -17894,6 +18068,32 @@ are limited. +Expression +SQL Functions(s) +Description +Notes +Context +Param/Output +BOOLEAN +BYTE +SHORT +INT +LONG +FLOAT +DOUBLE +DATE +TIMESTAMP +STRING +DECIMAL +NULL +BINARY +CALENDAR +ARRAY +MAP +STRUCT +UDT + + StddevSamp `stddev_samp`, `std`, `stddev` Aggregation computing sample standard deviation @@ -18027,32 +18227,6 @@ are limited. -Expression -SQL Functions(s) -Description -Notes -Context -Param/Output -BOOLEAN -BYTE -SHORT -INT -LONG -FLOAT -DOUBLE -DATE -TIMESTAMP -STRING -DECIMAL -NULL -BINARY -CALENDAR -ARRAY -MAP -STRUCT -UDT - - Sum `sum` Sum aggregate operator @@ -18319,6 +18493,32 @@ are limited. +Expression +SQL Functions(s) +Description +Notes +Context +Param/Output +BOOLEAN +BYTE +SHORT +INT +LONG +FLOAT +DOUBLE +DATE +TIMESTAMP +STRING +DECIMAL +NULL +BINARY +CALENDAR +ARRAY +MAP +STRUCT +UDT + + VarianceSamp `var_samp`, `variance` Aggregation computing sample variance @@ -18452,32 +18652,6 @@ are limited. -Expression -SQL Functions(s) -Description -Notes -Context -Param/Output -BOOLEAN -BYTE -SHORT -INT -LONG -FLOAT -DOUBLE -DATE -TIMESTAMP -STRING -DECIMAL -NULL -BINARY -CALENDAR -ARRAY -MAP -STRUCT -UDT - - NormalizeNaNAndZero Normalize NaN and zero diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index 4f58278360c..a9300a51c79 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import math import pytest from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_row_counts_equal,\ @@ -883,6 +884,170 @@ def test_hash_groupby_collect_partial_replace_with_distinct_fallback(data_gen, group by a""", conf=conf) + +exact_percentile_data_gen = [ByteGen(), ShortGen(), IntegerGen(), LongGen(), FloatGen(), DoubleGen(), + RepeatSeqGen(ByteGen(), length=100), + RepeatSeqGen(ShortGen(), length=100), + RepeatSeqGen(IntegerGen(), length=100), + RepeatSeqGen(LongGen(), length=100), + RepeatSeqGen(FloatGen(), length=100), + RepeatSeqGen(DoubleGen(), length=100), + FloatGen().with_special_case(math.nan, 500.0) + .with_special_case(math.inf, 500.0), + DoubleGen().with_special_case(math.nan, 500.0) + .with_special_case(math.inf, 500.0)] + +exact_percentile_reduction_data_gen = [ + [('val', data_gen), + ('freq', LongGen(min_val=0, max_val=1000000, nullable=False) + .with_special_case(0, weight=100))] + for data_gen in exact_percentile_data_gen] + +def exact_percentile_reduction(df): + return df.selectExpr( + 'percentile(val, 0.1)', + 'percentile(val, 0)', + 'percentile(val, 1)', + 'percentile(val, array(0.1))', + 'percentile(val, array())', + 'percentile(val, array(0.1, 0.5, 0.9))', + 'percentile(val, array(0, 0.0001, 0.5, 0.9999, 1))', + # There is issue with python data generation that still produces negative values for freq. + # See https://github.com/NVIDIA/spark-rapids/issues/9452. + # Thus, freq needs to be wrapped in abs. + 'percentile(val, 0.1, abs(freq))', + 'percentile(val, 0, abs(freq))', + 'percentile(val, 1, abs(freq))', + 'percentile(val, array(0.1), abs(freq))', + 'percentile(val, array(), abs(freq))', + 'percentile(val, array(0.1, 0.5, 0.9), abs(freq))', + 'percentile(val, array(0, 0.0001, 0.5, 0.9999, 1), abs(freq))' + ) + +@pytest.mark.parametrize('data_gen', exact_percentile_reduction_data_gen, ids=idfn) +def test_exact_percentile_reduction(data_gen): + assert_gpu_and_cpu_are_equal_collect( + lambda spark: exact_percentile_reduction(gen_df(spark, data_gen)) + ) + +exact_percentile_reduction_cpu_fallback_data_gen = [ + [('val', data_gen), + ('freq', LongGen(min_val=0, max_val=1000000, nullable=False) + .with_special_case(0, weight=100))] + for data_gen in [IntegerGen(), DoubleGen()]] + +@allow_non_gpu('ObjectHashAggregateExec', 'SortAggregateExec', 'ShuffleExchangeExec', 'HashPartitioning', + 'AggregateExpression', 'Alias', 'Cast', 'Literal', 'ProjectExec', + 'Percentile') +@pytest.mark.parametrize('data_gen', exact_percentile_reduction_cpu_fallback_data_gen, ids=idfn) +@pytest.mark.parametrize('replace_mode', ['partial', 'final|complete'], ids=idfn) +@pytest.mark.parametrize('use_obj_hash_agg', ['false', 'true'], ids=idfn) +@pytest.mark.xfail(condition=is_databricks104_or_later(), reason='https://github.com/NVIDIA/spark-rapids/issues/9494') +def test_exact_percentile_reduction_partial_fallback_to_cpu(data_gen, replace_mode, + use_obj_hash_agg): + cpu_clz, gpu_clz = ['Percentile'], ['GpuPercentileDefault'] + exist_clz, non_exist_clz = [], [] + # For aggregations without distinct, Databricks runtime removes the partial Aggregate stage ( + # map-side combine). There only exists an AggregateExec in Databricks runtimes. So, we need to + # set the expected exist_classes according to runtime. + if is_databricks_runtime(): + if replace_mode == 'partial': + exist_clz, non_exist_clz = cpu_clz, gpu_clz + else: + exist_clz, non_exist_clz = gpu_clz, cpu_clz + else: + exist_clz = cpu_clz + gpu_clz + + assert_cpu_and_gpu_are_equal_collect_with_capture( + lambda spark: gen_df(spark, data_gen).selectExpr( + 'percentile(val, 0.1)', + 'percentile(val, array(0, 0.0001, 0.5, 0.9999, 1))', + 'percentile(val, 0.1, abs(freq))', + 'percentile(val, array(0, 0.0001, 0.5, 0.9999, 1), abs(freq))'), + exist_classes=','.join(exist_clz), + non_exist_classes=','.join(non_exist_clz), + conf={'spark.rapids.sql.hashAgg.replaceMode': replace_mode, + 'spark.sql.execution.useObjectHashAggregateExec': use_obj_hash_agg} + ) + + +exact_percentile_groupby_data_gen = [ + [('key', RepeatSeqGen(IntegerGen(), length=100)), + ('val', data_gen), + ('freq', LongGen(min_val=0, max_val=1000000, nullable=False) + .with_special_case(0, weight=100))] + for data_gen in exact_percentile_data_gen] + +def exact_percentile_groupby(df): + return df.groupby('key').agg( + f.expr('percentile(val, 0.1)'), + f.expr('percentile(val, 0)'), + f.expr('percentile(val, 1)'), + f.expr('percentile(val, array(0.1))'), + f.expr('percentile(val, array())'), + f.expr('percentile(val, array(0.1, 0.5, 0.9))'), + f.expr('percentile(val, array(0, 0.0001, 0.5, 0.9999, 1))'), + # There is issue with python data generation that still produces negative values for freq. + # See https://github.com/NVIDIA/spark-rapids/issues/9452. + # Thus, freq needs to be wrapped in abs. + f.expr('percentile(val, 0.1, abs(freq))'), + f.expr('percentile(val, 0, abs(freq))'), + f.expr('percentile(val, 1, abs(freq))'), + f.expr('percentile(val, array(0.1), abs(freq))'), + f.expr('percentile(val, array(), abs(freq))'), + f.expr('percentile(val, array(0.1, 0.5, 0.9), abs(freq))'), + f.expr('percentile(val, array(0, 0.0001, 0.5, 0.9999, 1), abs(freq))') + ) + +@ignore_order +@pytest.mark.parametrize('data_gen', exact_percentile_groupby_data_gen, ids=idfn) +def test_exact_percentile_groupby(data_gen): + assert_gpu_and_cpu_are_equal_collect( + lambda spark: exact_percentile_groupby(gen_df(spark, data_gen)) + ) + +exact_percentile_groupby_cpu_fallback_data_gen = [ + [('key', RepeatSeqGen(IntegerGen(), length=100)), + ('val', data_gen), + ('freq', LongGen(min_val=0, max_val=1000000, nullable=False) + .with_special_case(0, weight=100))] + for data_gen in [IntegerGen(), DoubleGen()]] + +@ignore_order +@allow_non_gpu('ObjectHashAggregateExec', 'SortAggregateExec', 'ShuffleExchangeExec', 'HashPartitioning', + 'AggregateExpression', 'Alias', 'Cast', 'Literal', 'ProjectExec', + 'Percentile') +@pytest.mark.parametrize('data_gen', exact_percentile_groupby_cpu_fallback_data_gen, ids=idfn) +@pytest.mark.parametrize('replace_mode', ['partial', 'final|complete'], ids=idfn) +@pytest.mark.parametrize('use_obj_hash_agg', ['false', 'true'], ids=idfn) +@pytest.mark.xfail(condition=is_databricks104_or_later(), reason='https://github.com/NVIDIA/spark-rapids/issues/9494') +def test_exact_percentile_groupby_partial_fallback_to_cpu(data_gen, replace_mode, use_obj_hash_agg): + cpu_clz, gpu_clz = ['Percentile'], ['GpuPercentileDefault'] + exist_clz, non_exist_clz = [], [] + # For aggregations without distinct, Databricks runtime removes the partial Aggregate stage ( + # map-side combine). There only exists an AggregateExec in Databricks runtimes. So, we need to + # set the expected exist_classes according to runtime. + if is_databricks_runtime(): + if replace_mode == 'partial': + exist_clz, non_exist_clz = cpu_clz, gpu_clz + else: + exist_clz, non_exist_clz = gpu_clz, cpu_clz + else: + exist_clz = cpu_clz + gpu_clz + + assert_cpu_and_gpu_are_equal_collect_with_capture( + lambda spark: gen_df(spark, data_gen).groupby('key').agg( + f.expr('percentile(val, 0.1)'), + f.expr('percentile(val, array(0, 0.0001, 0.5, 0.9999, 1))'), + f.expr('percentile(val, 0.1, abs(freq))'), + f.expr('percentile(val, array(0, 0.0001, 0.5, 0.9999, 1), abs(freq))')), + exist_classes=','.join(exist_clz), + non_exist_classes=','.join(non_exist_clz), + conf={'spark.rapids.sql.hashAgg.replaceMode': replace_mode, + 'spark.sql.execution.useObjectHashAggregateExec': use_obj_hash_agg} + ) + + @ignore_order(local=True) @allow_non_gpu('ObjectHashAggregateExec', 'ShuffleExchangeExec', 'HashAggregateExec', 'HashPartitioning', diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 92469f1768d..5ce6b57259a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -3348,13 +3348,13 @@ object GpuOverrides extends Logging { "Collect a set of unique elements, not supported in reduction", ExprChecks.fullAgg( TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.DECIMAL_128 + - TypeSig.NULL + TypeSig.STRUCT + TypeSig.ARRAY), + TypeSig.NULL + TypeSig.STRUCT + TypeSig.ARRAY), TypeSig.ARRAY.nested(TypeSig.all), Seq(ParamCheck("input", (TypeSig.commonCudfTypes + TypeSig.DECIMAL_128 + - TypeSig.NULL + - TypeSig.STRUCT + - TypeSig.ARRAY).nested(), + TypeSig.NULL + + TypeSig.STRUCT + + TypeSig.ARRAY).nested(), TypeSig.all))), (c, conf, p, r) => new TypedImperativeAggExprMeta[CollectSet](c, conf, p, r) { @@ -3422,6 +3422,73 @@ object GpuOverrides extends Logging { GpuVarianceSamp(childExprs.head, !legacyStatisticalAggregate) } }), + expr[Percentile]( + "Aggregation computing exact percentile", + ExprChecks.reductionAndGroupByAgg( + // The output can be a single number or array depending on whether percentiles param + // is a single number or an array. + TypeSig.DOUBLE + TypeSig.ARRAY.nested(TypeSig.DOUBLE), + TypeSig.DOUBLE + TypeSig.ARRAY.nested(TypeSig.DOUBLE), + Seq( + // ANSI interval types are new in Spark 3.2.0 and are not yet supported by the + // current GPU implementation. + ParamCheck("input", TypeSig.integral + TypeSig.fp, TypeSig.integral + TypeSig.fp), + ParamCheck("percentage", + TypeSig.lit(TypeEnum.DOUBLE) + TypeSig.ARRAY.nested(TypeSig.lit(TypeEnum.DOUBLE)), + TypeSig.DOUBLE + TypeSig.ARRAY.nested(TypeSig.DOUBLE)), + ParamCheck("frequency", + TypeSig.LONG + TypeSig.ARRAY.nested(TypeSig.LONG), + TypeSig.LONG + TypeSig.ARRAY.nested(TypeSig.LONG)))), + (c, conf, p, r) => new TypedImperativeAggExprMeta[Percentile](c, conf, p, r) { + override def tagAggForGpu(): Unit = { + // Check if the input percentage can be supported on GPU. + GpuOverrides.extractLit(childExprs(1).wrapped.asInstanceOf[Expression]) match { + case None => + willNotWorkOnGpu("percentile on GPU only supports literal percentages") + case Some(Literal(null, _)) => + willNotWorkOnGpu("percentile on GPU only supports non-null literal percentages") + case Some(Literal(a: ArrayData, _)) => { + if((0 until a.numElements).exists(a.isNullAt)) { + willNotWorkOnGpu( + "percentile on GPU does not support percentage arrays containing nulls") + } + if (a.toDoubleArray().exists(percentage => percentage < 0.0 || percentage > 1.0)) { + willNotWorkOnGpu( + "percentile requires the input percentages given in the range [0, 1]") + } + } + case Some(_) => // This is fine + } + } + + override def convertToGpu(childExprs: Seq[Expression]): GpuExpression = { + val exprMeta = p.get.asInstanceOf[BaseExprMeta[_]] + val isReduction = exprMeta.context match { + case ReductionAggExprContext => true + case GroupByAggExprContext => false + case _ => throw new IllegalStateException( + s"Invalid aggregation context: ${exprMeta.context}") + } + GpuPercentile(childExprs.head, childExprs(1).asInstanceOf[GpuLiteral], childExprs(2), + isReduction) + } + // Declare the data type of the internal buffer so it can be serialized and + // deserialized correctly during shuffling. + override def aggBufferAttribute: AttributeReference = { + val aggBuffer = c.aggBufferAttributes.head + val dataType: DataType = ArrayType(StructType(Seq( + StructField("value", childExprs.head.dataType), + StructField("frequency", LongType))), containsNull = false) + aggBuffer.copy(dataType = dataType)(aggBuffer.exprId, aggBuffer.qualifier) + } + + override val needsAnsiCheck: Boolean = false + override val supportBufferConversion: Boolean = true + override def createCpuToGpuBufferConverter(): CpuToGpuAggregateBufferConverter = + CpuToGpuPercentileBufferConverter(childExprs.head.dataType) + override def createGpuToCpuBufferConverter(): GpuToCpuAggregateBufferConverter = + GpuToCpuPercentileBufferConverter(childExprs.head.dataType) + }), expr[ApproximatePercentile]( "Approximate percentile", ExprChecks.reductionAndGroupByAgg( diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/GpuPercentile.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/GpuPercentile.scala new file mode 100644 index 00000000000..ddab05814f6 --- /dev/null +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/GpuPercentile.scala @@ -0,0 +1,280 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.rapids.aggregate + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream} + +import scala.collection.mutable.ArrayBuffer + +import ai.rapids.cudf +import ai.rapids.cudf.{DType, GroupByAggregation, ReductionAggregation} +import com.nvidia.spark.rapids._ +import com.nvidia.spark.rapids.Arm.{withResource, withResourceIfAllowed} +import com.nvidia.spark.rapids.RapidsPluginImplicits.ReallyAGpuExpression +import com.nvidia.spark.rapids.jni.Histogram +import com.nvidia.spark.rapids.shims.ShimExpression + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, UnsafeArrayData, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData} +import org.apache.spark.sql.types._ +import org.apache.spark.sql.vectorized.ColumnarBatch + +case class CudfHistogram(override val dataType: DataType) extends CudfAggregate { + override lazy val reductionAggregate: cudf.ColumnVector => cudf.Scalar = + (input: cudf.ColumnVector) => input.reduce(ReductionAggregation.histogram(), DType.LIST) + override lazy val groupByAggregate: GroupByAggregation = GroupByAggregation.histogram() + override val name: String = "CudfHistogram" +} + +case class CudfMergeHistogram(override val dataType: DataType) + extends CudfAggregate { + override lazy val reductionAggregate: cudf.ColumnVector => cudf.Scalar = + (input: cudf.ColumnVector) => + input.getType match { + // This is called from updateAggregate in GpuPercentileWithFrequency. + case DType.STRUCT => input.reduce(ReductionAggregation.mergeHistogram(), DType.LIST) + + // This is always called from mergeAggregate. + case DType.LIST => withResource(input.getChildColumnView(0)) { histogram => + histogram.reduce(ReductionAggregation.mergeHistogram(), DType.LIST) + } + + case _ => throw new IllegalStateException("Invalid input in CudfMergeHistogram.") + } + + override lazy val groupByAggregate: GroupByAggregation = GroupByAggregation.mergeHistogram() + override val name: String = "CudfMergeHistogram" +} + +/** + * Perform the final evaluation step to compute percentiles from histograms. + */ +case class GpuPercentileEvaluation(childExpr: Expression, percentage: Either[Double, Array[Double]], + outputType: DataType, isReduction: Boolean) + extends GpuExpression with ShimExpression { + override def dataType: DataType = outputType + override def prettyName: String = "percentile_evaluation" + override def nullable: Boolean = true + override def children: Seq[Expression] = Seq(childExpr) + + private lazy val percentageArray = percentage match { + case Left(p) => Array(p) + case Right(p) => p + } + private lazy val outputAsList = outputType match { + case _: ArrayType => true + case _ => false + } + + override def columnarEval(batch: ColumnarBatch): GpuColumnVector = { + withResourceIfAllowed(childExpr.columnarEval(batch)) { histograms => + val percentiles = Histogram.percentileFromHistogram(histograms.getBase, + percentageArray, outputAsList) + GpuColumnVector.from(percentiles, outputType) + } + } +} + +abstract class GpuPercentile(childExpr: Expression, percentageLit: GpuLiteral, + isReduction: Boolean) + extends GpuAggregateFunction with Serializable { + protected lazy val histogramBufferType: DataType = + ArrayType(StructType(Seq(StructField("value", childExpr.dataType), + StructField("frequency", LongType))), + containsNull = false) + protected lazy val histogramBuffer: AttributeReference = + AttributeReference("histogramBuff", histogramBufferType)() + override def aggBufferAttributes: Seq[AttributeReference] = histogramBuffer :: Nil + + override lazy val initialValues: Seq[Expression] = + Seq(GpuLiteral.create(new GenericArrayData(Array.empty[Any]), histogramBufferType)) + override lazy val mergeAggregates: Seq[CudfAggregate] = + Seq(CudfMergeHistogram(histogramBufferType)) + override lazy val evaluateExpression: Expression = + GpuPercentileEvaluation(histogramBuffer, percentages, outputType, isReduction) + + override def dataType: DataType = histogramBufferType + override def prettyName: String = "percentile" + override def nullable: Boolean = true + override def children: Seq[Expression] = Seq(childExpr, percentageLit) + + private lazy val (returnPercentileArray, percentages): (Boolean, Either[Double, Array[Double]]) = + percentageLit.value match { + case null => (false, Right(Array())) + case num: Double => (false, Left(num)) + case arrayData: ArrayData => (true, Right(arrayData.toDoubleArray())) + case other => throw new IllegalStateException(s"Invalid percentage expression: $other") + } + private lazy val outputType: DataType = + if (returnPercentileArray) ArrayType(DoubleType, containsNull = false) else DoubleType +} + +/** + * Compute percentiles from just the input values. + */ +case class GpuPercentileDefault(childExpr: Expression, percentage: GpuLiteral, + isReduction: Boolean) + extends GpuPercentile(childExpr, percentage, isReduction) { + override lazy val inputProjection: Seq[Expression] = Seq(childExpr) + override lazy val updateAggregates: Seq[CudfAggregate] = Seq(CudfHistogram(histogramBufferType)) +} + +/** + * Compute percentiles from the input values associated with frequencies. + */ +case class GpuPercentileWithFrequency(childExpr: Expression, percentage: GpuLiteral, + frequencyExpr: Expression, isReduction: Boolean) + extends GpuPercentile(childExpr, percentage, isReduction) { + override lazy val inputProjection: Seq[Expression] = { + val outputType: DataType = if(isReduction) { + StructType(Seq(StructField("value", childExpr.dataType), StructField("frequency", LongType))) + } else { + histogramBufferType + } + Seq(GpuCreateHistogramIfValid(childExpr, frequencyExpr, isReduction, outputType)) + } + override lazy val updateAggregates: Seq[CudfAggregate] = + Seq(CudfMergeHistogram(histogramBufferType)) +} + +/** + * Create a histogram buffer from the input values and frequencies. + * + * The frequencies are also checked to ensure that they are non-negative. If a negative frequency + * exists, an exception will be thrown. + */ +case class GpuCreateHistogramIfValid(valuesExpr: Expression, frequenciesExpr: Expression, + isReduction: Boolean, outputType: DataType) + extends GpuExpression with ShimExpression { + override def dataType: DataType = outputType + override def prettyName: String = "create_histogram_if_valid" + override def nullable: Boolean = false + override def children: Seq[Expression] = Seq(valuesExpr, frequenciesExpr) + + override def columnarEval(batch: ColumnarBatch): GpuColumnVector = { + withResourceIfAllowed(valuesExpr.columnarEval(batch)) { values => + withResourceIfAllowed(frequenciesExpr.columnarEval(batch)) { frequencies => + // If a negative frequency exists, an exception will be thrown from here. + val histograms = Histogram.createHistogramIfValid(values.getBase, frequencies.getBase, + /*outputAsLists = */ !isReduction) + GpuColumnVector.from(histograms, outputType) + } + } + } +} + +object GpuPercentile{ + def apply(childExpr: Expression, percentageLit: GpuLiteral, frequencyExpr: Expression, + isReduction: Boolean): GpuPercentile = { + frequencyExpr match { + case GpuLiteral(freq, LongType) if freq == 1 => + GpuPercentileDefault(childExpr, percentageLit, isReduction) + case _ => + GpuPercentileWithFrequency(childExpr, percentageLit, frequencyExpr, isReduction) + } + } +} + +/** + * Convert the incoming byte stream received from Spark CPU into internal histogram buffer format. + */ +case class CpuToGpuPercentileBufferConverter(elementType: DataType) + extends CpuToGpuAggregateBufferConverter { + override def createExpression(child: Expression): CpuToGpuBufferTransition = + CpuToGpuPercentileBufferTransition(child, elementType) +} + +case class CpuToGpuPercentileBufferTransition(override val child: Expression, elementType: DataType) + extends CpuToGpuBufferTransition { + override def dataType: DataType = + ArrayType(StructType(Seq(StructField("value", elementType), + StructField("frequency", LongType))), + containsNull = false) + + // Deserialization from the input byte stream into the internal histogram buffer format. + override protected def nullSafeEval(input: Any): ArrayData = { + val bytes = input.asInstanceOf[Array[Byte]] + val bis = new ByteArrayInputStream(bytes) + val ins = new DataInputStream(bis) + + // Store a column of STRUCT + val histogram = ArrayBuffer[InternalRow]() + val row = new UnsafeRow(2) + + try { + var sizeOfNextRow = ins.readInt() + while (sizeOfNextRow >= 0) { + val bs = new Array[Byte](sizeOfNextRow) + ins.readFully(bs) + row.pointTo(bs, sizeOfNextRow) + val element = row.get(0, elementType) + val count = row.get(1, LongType).asInstanceOf[Long] + histogram.append(InternalRow.apply(element, count)) + sizeOfNextRow = ins.readInt() + } + ArrayData.toArrayData(histogram) + } finally { + ins.close() + bis.close() + } + } +} + +/** + * Convert the internal histogram buffer into a byte stream that can be deserialized by Spark CPU. + */ +case class GpuToCpuPercentileBufferConverter(elementType: DataType) + extends GpuToCpuAggregateBufferConverter { + override def createExpression(child: Expression): GpuToCpuBufferTransition = + GpuToCpuPercentileBufferTransition(child, elementType) +} + +case class GpuToCpuPercentileBufferTransition(override val child: Expression, elementType: DataType) + extends GpuToCpuBufferTransition { + // Serialization the internal histogram buffer into a byte array. + override protected def nullSafeEval(input: Any): Array[Byte] = { + val buffer = new Array[Byte](4 << 10) // 4K + val bos = new ByteArrayOutputStream() + val out = new DataOutputStream(bos) + + val histogram = input.asInstanceOf[UnsafeArrayData] + val projection = UnsafeProjection.create(Array[DataType](elementType, LongType)) + + try { + (0 until histogram.numElements()).foreach { i => + val row = histogram.getStruct(i, 2) + val element = row.get(0, elementType) + // The internal histogram buffer may contain null elements. + // We need to skip them as the Spark CPU does not process nulls after + // the updateAggregates step. + if(element!= null) { + val unsafeRow = projection.apply(row) + out.writeInt(unsafeRow.getSizeInBytes) + unsafeRow.writeToStream(out, buffer) + } + } + // Need to write a negative integer to indicate the end of the stream. + out.writeInt(-1) + out.flush() + bos.toByteArray + } finally { + out.close() + bos.close() + } + } +} diff --git a/tools/generated_files/operatorsScore.csv b/tools/generated_files/operatorsScore.csv index 0c583853490..b904921ca2b 100644 --- a/tools/generated_files/operatorsScore.csv +++ b/tools/generated_files/operatorsScore.csv @@ -182,6 +182,7 @@ NthValue,4 OctetLength,4 Or,4 PercentRank,4 +Percentile,4 PivotFirst,4 Pmod,4 PosExplode,4 diff --git a/tools/generated_files/supportedExprs.csv b/tools/generated_files/supportedExprs.csv index 5b16293d07a..38c504f8cd1 100644 --- a/tools/generated_files/supportedExprs.csv +++ b/tools/generated_files/supportedExprs.csv @@ -668,6 +668,14 @@ Min,S,`min`,None,reduction,input,S,S,S,S,S,S,S,S,PS,S,S,S,NS,NS,PS,NA,PS,NS Min,S,`min`,None,reduction,result,S,S,S,S,S,S,S,S,PS,S,S,S,NS,NS,PS,NA,PS,NS Min,S,`min`,None,window,input,S,S,S,S,S,S,S,S,PS,S,S,S,NS,NS,NS,NA,NS,NS Min,S,`min`,None,window,result,S,S,S,S,S,S,S,S,PS,S,S,S,NS,NS,NS,NA,NS,NS +Percentile,S,`percentile`,None,aggregation,input,NA,S,S,S,S,S,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA +Percentile,S,`percentile`,None,aggregation,percentage,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA +Percentile,S,`percentile`,None,aggregation,frequency,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA +Percentile,S,`percentile`,None,aggregation,result,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA +Percentile,S,`percentile`,None,reduction,input,NA,S,S,S,S,S,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA +Percentile,S,`percentile`,None,reduction,percentage,NA,NA,NA,NA,NA,NA,PS,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA +Percentile,S,`percentile`,None,reduction,frequency,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA +Percentile,S,`percentile`,None,reduction,result,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,S,NA,NA,NA PivotFirst,S, ,None,aggregation,pivotColumn,S,S,S,S,S,S,S,S,PS,S,S,S,NS,NS,NS,NS,NS,NS PivotFirst,S, ,None,aggregation,valueColumn,S,S,S,S,S,S,S,S,PS,S,S,S,NS,NS,NS,NS,NS,NS PivotFirst,S, ,None,aggregation,result,S,S,S,S,S,S,S,S,PS,S,S,S,NS,NS,PS,NS,NS,NS