From fd5bf91c6beb6dd0d3f39cf8057fcd41bec018fa Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Thu, 24 Sep 2020 16:10:38 -0500 Subject: [PATCH 1/4] Add in support for lead/lag and fix a few window operations Signed-off-by: Robert (Bobby) Evans --- docs/compatibility.md | 7 + docs/configs.md | 2 + integration_tests/src/main/python/data_gen.py | 10 + .../src/main/python/window_function_test.py | 84 ++- .../nvidia/spark/rapids/GpuOverrides.scala | 23 +- .../nvidia/spark/rapids/GpuWindowExec.scala | 112 +--- .../spark/rapids/GpuWindowExpression.scala | 566 +++++++++++------- .../spark/sql/rapids/AggregateFunctions.scala | 35 +- .../spark/rapids/WindowFunctionSuite.scala | 6 + 9 files changed, 513 insertions(+), 332 deletions(-) diff --git a/docs/compatibility.md b/docs/compatibility.md index 36872134778..b87cc4eec5d 100644 --- a/docs/compatibility.md +++ b/docs/compatibility.md @@ -206,6 +206,13 @@ Spark stores timestamps internally relative to the JVM time zone. Converting an arbitrary timestamp between time zones is not currently supported on the GPU. Therefore operations involving timestamps will only be GPU-accelerated if the time zone used by the JVM is UTC. +## Window Functions + +Because of ordering differences between the CPU and the GPU window functions especially row based +window functions like `row_number`, `lead`, and `lag` can produce different results if the ordering +includes both `-0.0` and `0.0`, or if the ordering is ambiguous. Spark can produce +different results from one run to another if the ordering is ambiguous on a window function too. + ## Casting between types In general, performing `cast` and `ansi_cast` operations on the GPU is compatible with the same operations on the CPU. However, there are some exceptions. For this reason, certain casts are disabled on the GPU by default and require configuration options to be specified to enable them. diff --git a/docs/configs.md b/docs/configs.md index 534107481b9..1df6ca1b898 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -153,7 +153,9 @@ Name | SQL Function(s) | Description | Default Value | Notes spark.rapids.sql.expression.IsNotNull|`isnotnull`|Checks if a value is not null|true|None| spark.rapids.sql.expression.IsNull|`isnull`|Checks if a value is null|true|None| spark.rapids.sql.expression.KnownFloatingPointNormalized| |Tag to prevent redundant normalization|true|None| +spark.rapids.sql.expression.Lag|`lag`|Window function that returns N entries behind this one|true|None| spark.rapids.sql.expression.LastDay|`last_day`|Returns the last day of the month which the date belongs to|true|None| +spark.rapids.sql.expression.Lead|`lead`|Window function that returns N entries ahead of this one|true|None| spark.rapids.sql.expression.Length|`length`, `character_length`, `char_length`|String character length|true|None| spark.rapids.sql.expression.LessThan|`<`|< operator|true|None| spark.rapids.sql.expression.LessThanOrEqual|`<=`|<= operator|true|None| diff --git a/integration_tests/src/main/python/data_gen.py b/integration_tests/src/main/python/data_gen.py index c9d11b2da07..0f976894550 100644 --- a/integration_tests/src/main/python/data_gen.py +++ b/integration_tests/src/main/python/data_gen.py @@ -541,6 +541,16 @@ def gen_scalar(data_gen, seed=0, force_no_nulls=False): v = list(gen_scalars(data_gen, 1, seed=seed, force_no_nulls=force_no_nulls)) return v[0] +def gen_scalar_values(data_gen, count, seed=0, force_no_nulls=False): + """Generate scalar values.""" + src = _gen_scalars_common(data_gen, count, seed=seed) + return (src.gen(force_no_nulls=force_no_nulls) for i in range(0, count)) + +def gen_scalar_value(data_gen, seed=0, force_no_nulls=False): + """Generate a single scalar value.""" + v = list(gen_scalar_values(data_gen, 1, seed=seed, force_no_nulls=force_no_nulls)) + return v[0] + def debug_df(df): """print out the contents of a dataframe for debugging.""" print('COLLECTED\n{}'.format(df.collect())) diff --git a/integration_tests/src/main/python/window_function_test.py b/integration_tests/src/main/python/window_function_test.py index 360e0cd1781..294a6747fda 100644 --- a/integration_tests/src/main/python/window_function_test.py +++ b/integration_tests/src/main/python/window_function_test.py @@ -18,6 +18,8 @@ from data_gen import * from pyspark.sql.types import * from marks import * +from pyspark.sql.window import Window +import pyspark.sql.functions as f _grpkey_longs_with_no_nulls = [ ('a', RepeatSeqGen(LongGen(nullable=False), length=20)), @@ -58,11 +60,87 @@ def test_window_aggs_for_rows(data_gen): ' (partition by a order by b,c rows between 2 preceding and current row) as min_c_asc, ' ' count(1) over ' ' (partition by a order by b,c rows between UNBOUNDED preceding and UNBOUNDED following) as count_1, ' + # once https://github.com/NVIDIA/spark-rapids/issues/218 is fixed uncomment this + #' count(c) over ' + #' (partition by a order by b,c rows between UNBOUNDED preceding and UNBOUNDED following) as count_c, ' ' row_number() over ' ' (partition by a order by b,c rows between UNBOUNDED preceding and CURRENT ROW) as row_num ' 'from window_agg_table ') +part_and_order_gens = [long_gen, DoubleGen(no_nans=True, special_cases=[]), + string_gen, boolean_gen, timestamp_gen] + +lead_lag_data_gens = [long_gen, DoubleGen(no_nans=True, special_cases=[]), + boolean_gen, timestamp_gen] + +def meta_idfn(meta): + def tmp(something): + return meta + idfn(something) + return tmp + +@ignore_order +@approximate_float +@pytest.mark.parametrize('c_gen', lead_lag_data_gens, ids=idfn) +@pytest.mark.parametrize('b_gen', part_and_order_gens, ids=meta_idfn('orderBy:')) +@pytest.mark.parametrize('a_gen', part_and_order_gens, ids=meta_idfn('partBy:')) +def test_multi_types_window_aggs_for_rows_lead_lag(a_gen, b_gen, c_gen): + data_gen = [ + ('a', RepeatSeqGen(a_gen, length=20)), + ('b', b_gen), + ('c', c_gen)] + # TODO range based appears to be the default without rowsBetween + # and fails horribly even when there is no rangeBetween + # ordering needs to include c because with nulls and especially on booleans + # it is possible to get a different ordering when it is ambiguous + baseWindowSpec = Window.partitionBy('a').orderBy('b', 'c') + inclusiveWindowSpec = baseWindowSpec.rowsBetween(-10, 100) + + defaultVal = gen_scalar_value(c_gen, force_no_nulls=False) + + def do_it(spark): + # once https://github.com/NVIDIA/spark-rapids/issues/218 is fixed uncomment this and put it in place below + #.withColumn('inc_count_c', f.count('c').over(inclusiveWindowSpec)) \ + return gen_df(spark, data_gen, length=2048) \ + .withColumn('inc_count_1', f.count('*').over(inclusiveWindowSpec)) \ + .withColumn('inc_max_c', f.max('c').over(inclusiveWindowSpec)) \ + .withColumn('inc_min_c', f.min('c').over(inclusiveWindowSpec)) \ + .withColumn('lead_5_c', f.lead('c', 5).over(baseWindowSpec)) \ + .withColumn('lead_def_c', f.lead('c', 2, defaultVal).over(baseWindowSpec)) \ + .withColumn('lag_1_c', f.lag('c', 1).over(baseWindowSpec)) \ + .withColumn('lag_def_c', f.lag('c', 4, defaultVal).over(baseWindowSpec)) \ + .withColumn('row_num', f.row_number().over(baseWindowSpec)) + assert_gpu_and_cpu_are_equal_collect(do_it, conf={'spark.rapids.sql.hasNans': 'false'}) + +# lead and lag don't currently work for strig columns, so redo the tests, but just for strings +# without lead and lag +@ignore_order +@approximate_float +@pytest.mark.parametrize('c_gen', [string_gen], ids=idfn) +@pytest.mark.parametrize('b_gen', part_and_order_gens, ids=meta_idfn('orderBy:')) +@pytest.mark.parametrize('a_gen', part_and_order_gens, ids=meta_idfn('partBy:')) +def test_multi_types_window_aggs_for_rows(a_gen, b_gen, c_gen): + data_gen = [ + ('a', RepeatSeqGen(a_gen, length=20)), + ('b', b_gen), + ('c', c_gen)] + # TODO range based appears to be the default without rowsBetween + # and fails horribly even when there is no rangeBetween + # ordering needs to include c because with nulls and especially on booleans + # it is possible to get a different ordering when it is ambiguous + baseWindowSpec = Window.partitionBy('a').orderBy('b', 'c') + inclusiveWindowSpec = baseWindowSpec.rowsBetween(-10, 100) + + def do_it(spark): + # once https://github.com/NVIDIA/spark-rapids/issues/218 is fixed uncomment this and put it in place below + #.withColumn('inc_count_c', f.count('c').over(inclusiveWindowSpec)) \ + return gen_df(spark, data_gen, length=2048) \ + .withColumn('inc_count_1', f.count('*').over(inclusiveWindowSpec)) \ + .withColumn('inc_max_c', f.max('c').over(inclusiveWindowSpec)) \ + .withColumn('inc_min_c', f.min('c').over(inclusiveWindowSpec)) \ + .withColumn('row_num', f.row_number().over(baseWindowSpec)) + assert_gpu_and_cpu_are_equal_collect(do_it, conf={'spark.rapids.sql.hasNans': 'false'}) + # Test for RANGE queries, with timestamp order-by expressions. # Non-timestamp order-by columns are currently unsupported for RANGE queries. # See https://github.com/NVIDIA/spark-rapids/issues/216 @@ -86,6 +164,10 @@ def test_window_aggs_for_ranges(data_gen): ' count(1) over ' ' (partition by a order by cast(b as timestamp) asc ' ' range between CURRENT ROW and UNBOUNDED following) as count_1_asc, ' + # once https://github.com/NVIDIA/spark-rapids/issues/218 is fixed uncomment this + #' count(c) over ' + #' (partition by a order by cast(b as timestamp) asc ' + #' range between CURRENT ROW and UNBOUNDED following) as count_c_asc, ' ' sum(c) over ' ' (partition by a order by cast(b as timestamp) asc ' ' range between UNBOUNDED preceding and CURRENT ROW) as sum_c_unbounded, ' @@ -94,7 +176,6 @@ def test_window_aggs_for_ranges(data_gen): ' range between UNBOUNDED preceding and UNBOUNDED following) as max_c_unbounded ' 'from window_agg_table') - @pytest.mark.xfail(reason="[UNSUPPORTED] Ranges over non-timestamp columns " "(https://github.com/NVIDIA/spark-rapids/issues/216)") @ignore_order @@ -110,7 +191,6 @@ def test_window_aggs_for_ranges_of_dates(data_gen): 'from window_agg_table' ) - @pytest.mark.xfail(reason="[BUG] `COUNT(x)` should not count null values of `x` " "(https://github.com/NVIDIA/spark-rapids/issues/218)") @ignore_order diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 274ccf53753..ac2cf045701 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -494,8 +494,13 @@ object GpuOverrides { (lit, conf, p, r) => new ExprMeta[Literal](lit, conf, p, r) { override def convertToGpu(): GpuExpression = GpuLiteral(lit.value, lit.dataType) - // There are so many of these that we don't need to print them out. - override def print(append: StringBuilder, depth: Int, all: Boolean): Unit = {} + // There are so many of these that we don't need to print them out, unless it + // will not work on the GPU + override def print(append: StringBuilder, depth: Int, all: Boolean): Unit = { + if (!this.canThisBeReplaced) { + super.print(append, depth, all) + } + } /** * We are overriding this method because currently we only support CalendarIntervalType @@ -600,6 +605,20 @@ object GpuOverrides { override def convertToGpu(): GpuExpression = GpuRowNumber() } ), + expr[Lead]( + "Window function that returns N entries ahead of this one", + (lead, conf, p, r) => new OffsetWindowFunctionMeta[Lead](lead, conf, p, r) { + override def convertToGpu(): GpuExpression = + GpuLead(input.convertToGpu(), offset.convertToGpu(), default.convertToGpu()) + } + ), + expr[Lag]( + "Window function that returns N entries behind this one", + (lag, conf, p, r) => new OffsetWindowFunctionMeta[Lag](lag, conf, p, r) { + override def convertToGpu(): GpuExpression = + GpuLag(input.convertToGpu(), offset.convertToGpu(), default.convertToGpu()) + } + ), expr[UnaryMinus]( "Negate a numeric value", (a, conf, p, r) => new UnaryExprMeta[UnaryMinus](a, conf, p, r) { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala index fb3f5fcc899..fad31206278 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala @@ -16,19 +16,13 @@ package com.nvidia.spark.rapids -import ai.rapids.cudf.NvtxColor -import com.nvidia.spark.rapids.GpuMetricNames._ - import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, Expression, NamedExpression, SortOrder} import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} -import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.execution.window.WindowExec -import org.apache.spark.sql.rapids.GpuAggregateExpression -import org.apache.spark.sql.types.IntegerType -import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} +import org.apache.spark.sql.vectorized.ColumnarBatch class GpuWindowExecMeta(windowExec: WindowExec, conf: RapidsConf, @@ -56,7 +50,7 @@ class GpuWindowExecMeta(windowExec: WindowExec, val resultMethod = windowExec.getClass.getMethod("windowExpression") resultMethod.invoke(windowExec).asInstanceOf[Seq[NamedExpression]] } catch { - case e: NoSuchMethodException => + case _: NoSuchMethodException => resultColumnsOnly = true val winExpr = windowExec.getClass.getMethod("projectList") winExpr.invoke(windowExec).asInstanceOf[Seq[NamedExpression]] @@ -75,7 +69,6 @@ class GpuWindowExecMeta(windowExec: WindowExec, windowExec.orderSpec.map(GpuOverrides.wrapExpr(_, conf, Some(this))) override def tagPlanForGpu(): Unit = { - // Implementation depends on receiving a `NamedExpression` wrapped WindowExpression. windowExpressions.map(meta => meta.wrapped) .filter(expr => !expr.isInstanceOf[NamedExpression]) @@ -126,98 +119,27 @@ case class GpuWindowExec( override def outputPartitioning: Partitioning = child.outputPartitioning - override lazy val additionalMetrics: Map[String, SQLMetric] = - Map( - NUM_INPUT_ROWS -> - SQLMetrics.createMetric(sparkContext, DESCRIPTION_NUM_INPUT_ROWS), - NUM_INPUT_BATCHES -> - SQLMetrics.createMetric(sparkContext, DESCRIPTION_NUM_INPUT_BATCHES), - PEAK_DEVICE_MEMORY -> - SQLMetrics.createSizeMetric(sparkContext, DESCRIPTION_PEAK_DEVICE_MEMORY) - ) - - // Job metrics. - private var maxDeviceMemory = 0L - private val peakDeviceMemoryMetric = metrics(GpuMetricNames.PEAK_DEVICE_MEMORY) - private val numInputBatchesMetric = metrics(GpuMetricNames.NUM_INPUT_BATCHES) - private val numInputRowsMetric = metrics(GpuMetricNames.NUM_INPUT_ROWS) - private val numOutputBatchesMetric = metrics(GpuMetricNames.NUM_OUTPUT_BATCHES) - private val numOutputRowsMetric = metrics(GpuMetricNames.NUM_OUTPUT_ROWS) - private val totalTimeMetric = metrics(GpuMetricNames.TOTAL_TIME) - override protected def doExecute(): RDD[InternalRow] = throw new IllegalStateException(s"Row-based execution should not happen, in $this.") - private def bindReferences() : Seq[GpuExpression] = { - - // Address bindings for all expressions evaluated by WindowExec. - val boundProjectList = windowExpressionAliases.map( - alias => GpuBindReferences.bindReference(alias, child.output)) - - // Bind aggregation column. - boundProjectList.map( - expr => expr.transform { - case windowExpr: GpuWindowExpression => - val boundAggExpression = GpuBindReferences.bindReference( - windowExpr.windowFunction match { - case aggExpression: GpuAggregateExpression => - aggExpression.aggregateFunction.inputProjection.head - case _ : GpuRowNumber => GpuLiteral(1, IntegerType) - case anythingElse => - throw new IllegalStateException(s"Unexpected window operation " + - s"${anythingElse.prettyName}") - }, - child.output) - windowExpr.setBoundAggCol(boundAggExpression) - windowExpr - }.asInstanceOf[GpuExpression] - ) - } - override protected def doExecuteColumnar(): RDD[ColumnarBatch] = { + val numOutputBatches = metrics(GpuMetricNames.NUM_OUTPUT_BATCHES) + val numOutputRows = metrics(GpuMetricNames.NUM_OUTPUT_ROWS) + val totalTime = metrics(GpuMetricNames.TOTAL_TIME) + + val projectList = if (resultColumnsOnly) { + windowExpressionAliases + } else { + child.output ++ windowExpressionAliases + } - val boundOutputProjectList = bindReferences() - - val input = child.executeColumnar() - input.map { - cb => { - - numInputBatchesMetric += 1 - numInputRowsMetric += cb.numRows - - var originalCols: Array[GpuColumnVector] = null - var aggCols : Array[GpuColumnVector] = null - - try { - originalCols = GpuColumnVector.extractColumns(cb) - - withResource( - new NvtxWithMetrics( - "WindowExec projections", NvtxColor.GREEN, totalTimeMetric) - ) { _ => - aggCols = boundOutputProjectList.map( - _.columnarEval(cb).asInstanceOf[GpuColumnVector]).toArray - } - - numOutputBatchesMetric += 1 - numOutputRowsMetric += cb.numRows - - val outputBatch = if (resultColumnsOnly) { - new ColumnarBatch(aggCols.asInstanceOf[Array[ColumnVector]], cb.numRows()) - } else { - originalCols.foreach(_.incRefCount()) - new ColumnarBatch(originalCols ++ aggCols, cb.numRows()) - } - - maxDeviceMemory = maxDeviceMemory.max( - GpuColumnVector.getTotalDeviceMemoryUsed(outputBatch)) - peakDeviceMemoryMetric.set(maxDeviceMemory) + val boundProjectList = + GpuBindReferences.bindGpuReferences(projectList, child.output) - outputBatch - } finally { - cb.close() - } - } + child.executeColumnar().map { cb => + numOutputBatches += 1 + numOutputRows += cb.numRows + GpuProjectExec.projectAndClose(cb, boundProjectList, totalTime) } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala index 7ef35a99cf2..47a1a6675c0 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala @@ -16,14 +16,14 @@ package com.nvidia.spark.rapids -import ai.rapids.cudf.{Aggregation, AggregationOverWindow, DType, Table, WindowOptions} +import ai.rapids.cudf.{Aggregation, AggregationOnColumn, ColumnVector, WindowOptions} import com.nvidia.spark.rapids.GpuOverrides.wrapExpr import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ -import org.apache.spark.sql.rapids._ +import org.apache.spark.sql.rapids.GpuAggregateExpression import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.unsafe.types.CalendarInterval @@ -35,6 +35,29 @@ class GpuWindowExpressionMeta( rule: ConfKeysAndIncompat) extends ExprMeta[WindowExpression](windowExpression, conf, parent, rule) { + private def getBoundaryValue(boundary : Expression) : Int = boundary match { + case literal: Literal => + literal.dataType match { + case IntegerType => + literal.value.asInstanceOf[Int] + case CalendarIntervalType => + val ci = literal.value.asInstanceOf[CalendarInterval] + if (ci.months != 0 || ci.microseconds != 0) { + willNotWorkOnGpu("only days are supported for window range intervals") + } + ci.days + case t => + willNotWorkOnGpu(s"unsupported window boundary type $t") + -1 + } + case UnboundedPreceding => Int.MinValue + case UnboundedFollowing => Int.MaxValue + case CurrentRow => 0 + case _ => + willNotWorkOnGpu("unsupported window boundary type") + -1 + } + override def tagExprForGpu(): Unit = { // Must have two children: @@ -51,13 +74,17 @@ class GpuWindowExpressionMeta( windowFunction match { case aggregateExpression : AggregateExpression => aggregateExpression.aggregateFunction match { - case Count(exp) => { + // Count does not work in these cases because of a bug in cudf where a rolling count + // does not do the correct thing for null entries + // TODO link to issue. + // Once that is fixed this can be deleted and the check will fall through. + case Count(exp) => if (!exp.forall(x => x.isInstanceOf[Literal])) { willNotWorkOnGpu(s"Currently, only COUNT(1) and COUNT(*) are supported. " + - s"COUNT($exp) is not supported in windowing.") + s"COUNT($exp) is not supported in windowing.") } - } - case Sum(_) | Min(_) | Max(_) => // Supported. + // Sadly we have to white list the aggregations because they do not all work + case Count(_) | Sum(_) | Min(_) | Max(_) => // Supported. case other: AggregateFunction => willNotWorkOnGpu(s"AggregateFunction ${other.prettyName} " + s"is not supported in windowing.") @@ -65,18 +92,74 @@ class GpuWindowExpressionMeta( willNotWorkOnGpu(s"Expression not supported in windowing. " + s"Found ${anythingElse.prettyName}") } - - case RowNumber() => - + case _: WindowFunction => case _ => willNotWorkOnGpu("Only AggregateExpressions are supported on GPU as WindowFunctions. " + s"Found ${windowFunction.prettyName}") } - val spec = wrapped.windowSpec - if (!spec.frameSpecification.isInstanceOf[SpecifiedWindowFrame]) { - willNotWorkOnGpu(s"Only SpecifiedWindowFrame is a supported window-frame specification. " + - s"Found ${spec.frameSpecification.prettyName}") + wrapped.windowSpec.frameSpecification match { + case spec: SpecifiedWindowFrame => + // Will also verify that the types are what we expect. + val lower = getBoundaryValue(spec.lower) + val upper = getBoundaryValue(spec.upper) + spec.frameType match { + case RowFrame => + windowFunction match { + case Lead(_, _, _) | Lag(_, _, _) => // ignored we are good + case _ => + // need to be sure that the lower/upper are acceptable + if (lower > 0) { + willNotWorkOnGpu(s"lower-bounds ahead of current row is not supported. " + + s"Found $lower") + } + if (upper < 0) { + willNotWorkOnGpu(s"upper-bounds behind the current row is not supported. " + + s"Found $upper") + } + } + case RangeFrame => + // Spark by default does a RangeFrame if no RowFrame is given + // even for columns that are not time type columns. We can switch this back to row + // based iff the ranges we are looking at are current row or unbounded and the columns + // we are ordering on, are not nullable, because a null is by definition outside of the + // range, even of unbounded. + val orderSpec = wrapped.windowSpec.orderSpec + val allTime = orderSpec.forall { so => + so.dataType match { + case DateType | TimestampType => true + case _ => false + } + } + val allNotTime = orderSpec.forall { so => + so.dataType match { + case DateType | TimestampType => false + case _ => true + } + } + if (allNotTime) { + val allNullable = orderSpec.forall(_.nullable) + val areLowerAndUpperOkay = + (lower == 0 || lower == Int.MaxValue || lower == Int.MinValue) && + (upper == 0 || upper == Int.MaxValue || upper == Int.MinValue) + if (!allNullable || !areLowerAndUpperOkay) { + willNotWorkOnGpu("range based windows on non-date/time columns is only" + + " supported if the columns are nullable and for very specific rage values.") + } + } else if (allTime){ + if (orderSpec.length > 1) { + // We only support a single time column + willNotWorkOnGpu("only a single date/time based column in window" + + " range functions is supported") + } + } else { + willNotWorkOnGpu("a mixture of date/time and non date/time based" + + " columns is not supported in a window range function") + } + } + case other => + willNotWorkOnGpu(s"only SpecifiedWindowFrame is a supported window-frame specification. " + + s"Found ${other.prettyName}") } } @@ -105,18 +188,41 @@ case class GpuWindowExpression(windowFunction: Expression, windowSpec: GpuWindow override def sql: String = windowFunction.sql + " OVER " + windowSpec.sql - private var boundAggCol : Expression = _ - private val frameType : FrameType = - windowSpec.frameSpecification.asInstanceOf[GpuSpecifiedWindowFrame].frameType - - def setBoundAggCol(bound : Expression) : Unit = { - boundAggCol = bound + private val windowFrameSpec = windowSpec.frameSpecification.asInstanceOf[GpuSpecifiedWindowFrame] + private val frameType : FrameType = windowFrameSpec.frameType + private val windowFunc = windowFunction match { + case func: GpuAggregateWindowFunction => func + case agg: GpuAggregateExpression => agg.aggregateFunction match { + case func: GpuAggregateWindowFunction => func + case other => + throw new IllegalStateException(s"${other.getClass} is not a supported window aggregation") + } + case other => + throw new IllegalStateException(s"${other.getClass} is not a supported window function") + } + private lazy val boundRowProjectList = windowSpec.partitionSpec ++ + windowFunc.windowInputProjection + private lazy val boundRangeProjectList = windowSpec.partitionSpec ++ + windowSpec.orderSpec.map(_.child.asInstanceOf[GpuExpression]) ++ + windowFunc.windowInputProjection + + private lazy val allNotTime = windowSpec.orderSpec.forall { so => + so.dataType match { + case DateType | TimestampType => false + case _ => true + } } override def columnarEval(cb: ColumnarBatch) : Any = { frameType match { case RowFrame => evaluateRowBasedWindowExpression(cb) - case RangeFrame => evaluateRangeBasedWindowExpression(cb) + case RangeFrame => + if (allNotTime) { + // We already verified that this will be okay... + evaluateRowBasedWindowExpression(cb) + } else { + evaluateRangeBasedWindowExpression(cb) + } case allElse => throw new UnsupportedOperationException( s"Unsupported window expression frame type: $allElse") @@ -124,184 +230,98 @@ case class GpuWindowExpression(windowFunction: Expression, windowSpec: GpuWindow } private def evaluateRowBasedWindowExpression(cb : ColumnarBatch) : GpuColumnVector = { + val numGroupingColumns = windowSpec.partitionSpec.length + val totalExtraColumns = numGroupingColumns - var groupingColsCB : ColumnarBatch = null - var aggregationColsCB : ColumnarBatch = null - var groupingCols : Array[GpuColumnVector] = null - var aggregationCols : Array[GpuColumnVector] = null - var inputTable : Table = null - var aggResultTable : Table = null - - try { - // Project required column batches. - groupingColsCB = GpuProjectExec.project(cb, windowSpec.partitionSpec) - aggregationColsCB = GpuProjectExec.project(cb, Seq(boundAggCol)) - // Extract required columns columns. - groupingCols = GpuColumnVector.extractColumns(groupingColsCB) - aggregationCols = GpuColumnVector.extractColumns(aggregationColsCB) - - inputTable = new Table( ( groupingCols ++ aggregationCols ).map(_.getBase) : _* ) - - aggResultTable = inputTable.groupBy(0 until groupingColsCB.numCols(): _*) - .aggregateWindows( - GpuWindowExpression.getRowBasedWindowFrame( - groupingColsCB.numCols(), - windowFunction, - windowSpec.frameSpecification.asInstanceOf[GpuSpecifiedWindowFrame] - ) - ) - - val aggColumn = windowFunction match { - - // Special-case handling for COUNT(1)/COUNT(*): - // GpuCount aggregation expects to return LongType (INT64), - // but CUDF returns IntType (INT32) for COUNT() window function. Must cast back up to INT64. - case GpuAggregateExpression(GpuCount(_), _, _, _, _) => - aggResultTable.getColumn(0).castTo(DType.INT64) // Aggregation column is at index `0`. + val aggColumn = withResource(GpuProjectExec.project(cb, boundRowProjectList)) { projected => + withResource(GpuColumnVector.from(projected)) { table => + val bases = GpuColumnVector.extractBases(projected).zipWithIndex + .slice(totalExtraColumns, boundRowProjectList.length) - case _ => - val origAggColumn = aggResultTable.getColumn(0) // Aggregation column is at index `0`. - origAggColumn.incRefCount() - origAggColumn - } + val agg = windowFunc.windowAggregation(bases) + .overWindow(GpuWindowExpression.getRowBasedWindowOptions(windowFrameSpec)) + withResource(table + .groupBy(0 until numGroupingColumns: _*) + .aggregateWindows(agg)) { aggResultTable => + aggResultTable.getColumn(0).incRefCount() + } + } + } + val expectedType = GpuColumnVector.getRapidsType(windowFunc.dataType) + if (expectedType != aggColumn.getDataType) { + withResource(aggColumn) { aggColumn => + GpuColumnVector.from(aggColumn.castTo(expectedType)) + } + } else { GpuColumnVector.from(aggColumn) - } finally { - if (groupingColsCB != null) groupingColsCB.close() - if (aggregationColsCB != null) aggregationColsCB.close() - if (inputTable != null) inputTable.close() - if (aggResultTable != null) aggResultTable.close() } } private def evaluateRangeBasedWindowExpression(cb : ColumnarBatch) : GpuColumnVector = { - - var groupingColsCB : ColumnarBatch = null - var sortColsCB : ColumnarBatch = null - var aggregationColsCB : ColumnarBatch = null - var groupingCols : Array[GpuColumnVector] = null - var sortCols : Array[GpuColumnVector] = null - var aggregationCols : Array[GpuColumnVector] = null - var inputTable : Table = null - var aggResultTable : Table = null - - try { - // Project required column batches. - groupingColsCB = GpuProjectExec.project(cb, windowSpec.partitionSpec) - assert(windowSpec.orderSpec.size == 1, "Expected a single sort column.") - - sortColsCB = GpuProjectExec.project(cb, - windowSpec.orderSpec.map(_.child.asInstanceOf[GpuExpression])) - aggregationColsCB = GpuProjectExec.project(cb, Seq(boundAggCol)) - - // Extract required columns columns. - groupingCols = GpuColumnVector.extractColumns(groupingColsCB) - sortCols = GpuColumnVector.extractColumns(sortColsCB) - aggregationCols = GpuColumnVector.extractColumns(aggregationColsCB) - - inputTable = new Table( ( groupingCols ++ sortCols ++ aggregationCols ).map(_.getBase) : _* ) - - aggResultTable = inputTable.groupBy(0 until groupingColsCB.numCols(): _*) - .aggregateWindowsOverTimeRanges( - GpuWindowExpression.getRangeBasedWindowFrame( - groupingColsCB.numCols() + sortColsCB.numCols(), - groupingColsCB.numCols(), - windowFunction, - windowSpec.frameSpecification.asInstanceOf[GpuSpecifiedWindowFrame], - windowSpec.orderSpec.head.isAscending - ) - ) - - val aggColumn = windowFunction match { - - // Special-case handling for COUNT(1)/COUNT(*): - // GpuCount aggregation expects to return LongType (INT64), - // but CUDF returns IntType (INT32) for COUNT() window function. - // Must cast back up to INT64. - case GpuAggregateExpression(GpuCount(_), _, _, _, _) => - aggResultTable.getColumn(0).castTo(DType.INT64) // Aggregation column is at index `0` - - case _ => - val origAggColumn = aggResultTable.getColumn(0) // Aggregation column is at index `0` - origAggColumn.incRefCount() - origAggColumn + val numGroupingColumns = windowSpec.partitionSpec.length + val numSortColumns = windowSpec.orderSpec.length + assert(numSortColumns == 1) + val totalExtraColumns = numGroupingColumns + numSortColumns + + val aggColumn = withResource(GpuProjectExec.project(cb, boundRangeProjectList)) { projected => + withResource(GpuColumnVector.from(projected)) { table => + val bases = GpuColumnVector.extractBases(projected).zipWithIndex + .slice(totalExtraColumns, boundRangeProjectList.length) + val agg = windowFunc.windowAggregation(bases) + .overWindow(GpuWindowExpression.getRangeBasedWindowOptions(windowFrameSpec, + windowSpec.orderSpec, + numGroupingColumns)) + withResource(table + .groupBy(0 until numGroupingColumns: _*) + .aggregateWindowsOverTimeRanges(agg)) { aggResultTable => + aggResultTable.getColumn(0).incRefCount() + } } - + } + val expectedType = GpuColumnVector.getRapidsType(windowFunc.dataType) + if (expectedType != aggColumn.getDataType) { + withResource(aggColumn) { aggColumn => + GpuColumnVector.from(aggColumn.castTo(expectedType)) + } + } else { GpuColumnVector.from(aggColumn) - } finally { - if (groupingColsCB != null) groupingColsCB.close() - if (sortColsCB != null) sortColsCB.close() - if (aggregationColsCB != null) aggregationColsCB.close() - if (inputTable != null) inputTable.close() - if (aggResultTable != null) aggResultTable.close() } } } object GpuWindowExpression { - def getRowBasedWindowFrame(columnIndex : Int, - aggExpression : Expression, - windowSpec : GpuSpecifiedWindowFrame) - : AggregationOverWindow = { - - // FIXME: Currently, only negative or 0 values are supported. - var lower = getBoundaryValue(windowSpec.lower) - if (lower > 0) { - throw new IllegalStateException( - s"Lower-bounds ahead of current row is not supported. Found $lower") + def getRowBasedLower(windowFrameSpec : GpuSpecifiedWindowFrame): Int = { + val lower = getBoundaryValue(windowFrameSpec.lower) + + // Translate the lower bound value to CUDF semantics: + // In spark 0 is the current row and lower bound is negative relative to that + // In CUDF the preceding window starts at the current row with 1 and up from there the + // further from the current row. + if (lower >= Int.MaxValue) { + Int.MinValue + } else if (lower <= Int.MinValue) { + Int.MaxValue + } else { + -(lower-1) } + } - // Now, translate the lower bound value to CUDF semantics: - // 1. CUDF requires lower bound value to include the current row. - // i.e. If Spark's lower bound == 3, CUDF's lower bound == 2. - // 2. Spark's lower_bound (preceding CURRENT ROW) as a negative offset. - // CUDF requires a positive number - // Note: UNBOUNDED PRECEDING implies lower == Int.MinValue, which needs special handling - // for negation. - // - // The following covers both requirements: - lower = Math.abs(lower-1) - - val upper = getBoundaryValue(windowSpec.upper) - if (upper < 0) { - throw new IllegalStateException( - s"Upper-bounds behind of current row is not supported. Found $upper") - } + def getRowBasedUpper(windowFrameSpec : GpuSpecifiedWindowFrame): Int = + getBoundaryValue(windowFrameSpec.upper) - val windowOption = WindowOptions.builder().minPeriods(1) - .window(lower, upper).build() + def getRowBasedWindowOptions(windowFrameSpec : GpuSpecifiedWindowFrame): WindowOptions = { + val lower = getRowBasedLower(windowFrameSpec) + val upper = getRowBasedUpper(windowFrameSpec) - val agg: Aggregation = aggExpression match { - case gpuAggregateExpression : GpuAggregateExpression => - gpuAggregateExpression.aggregateFunction match { - case GpuCount(_) => Aggregation.count() - case GpuSum(_) => Aggregation.sum() - case GpuMin(_) => Aggregation.min() - case GpuMax(_) => Aggregation.max() - case anythingElse => - throw new UnsupportedOperationException( - s"Unsupported aggregation: ${anythingElse.prettyName}") - } - case _: GpuRowNumber => - // ROW_NUMBER does not depend on input column values, but it still should be fine - Aggregation.rowNumber() - case anythingElse => - throw new UnsupportedOperationException( - s"Unsupported window aggregation: ${anythingElse.prettyName}") - } - agg.onColumn(columnIndex).overWindow(windowOption) + WindowOptions.builder().minPeriods(1) + .window(lower, upper).build() } - def getRangeBasedWindowFrame(aggColumnIndex : Int, - timeColumnIndex : Int, - aggExpression : Expression, - windowSpec : GpuSpecifiedWindowFrame, - timestampIsAscending : Boolean) - : AggregationOverWindow = { - + def getRangeBasedLower(windowFrameSpec : GpuSpecifiedWindowFrame): Int = { // FIXME: Currently, only negative or 0 values are supported. - var lower = getBoundaryValue(windowSpec.lower) + val lower = getBoundaryValue(windowFrameSpec.lower) if (lower > 0) { throw new IllegalStateException( s"Lower-bounds ahead of current row is not supported. Found: $lower") @@ -312,42 +332,41 @@ object GpuWindowExpression { // CUDF requires a positive offset. // Note: UNBOUNDED PRECEDING implies lower == Int.MinValue, which needs special handling // for negation. - lower = if (lower == Int.MinValue) Int.MaxValue else Math.abs(lower) + if (lower == Int.MinValue) { + Int.MaxValue + } else { + Math.abs(lower) + } + } - val upper = getBoundaryValue(windowSpec.upper) - if(upper < 0) { + def getRangeBasedUpper(windowFrameSpec : GpuSpecifiedWindowFrame): Int = { + val upper = getBoundaryValue(windowFrameSpec.upper) + if (upper < 0) { throw new IllegalStateException( s"Upper-bounds behind current row is not supported. Found: $upper") } + upper + } - val windowOptionBuilder = WindowOptions.builder() - .minPeriods(1) - .window(lower,upper) - .timestampColumnIndex(timeColumnIndex) - if (timestampIsAscending) { + def getRangeBasedWindowOptions( + windowFrameSpec : GpuSpecifiedWindowFrame, + orderSpec: Seq[SortOrder], + timeColumnIndex : Int): WindowOptions = { + val lower = getRangeBasedLower(windowFrameSpec) + val upper = getRangeBasedUpper(windowFrameSpec) + + val windowOptionBuilder = WindowOptions.builder().minPeriods(1) + .window(lower, upper) + .timestampColumnIndex(timeColumnIndex) + + // TODO should we check this out? + if (orderSpec.head.isAscending) { windowOptionBuilder.timestampAscending() - } - else { + } else { windowOptionBuilder.timestampDescending() } - val windowOption = windowOptionBuilder.build() - - val agg: Aggregation = aggExpression match { - case gpuAggExpression : GpuAggregateExpression => gpuAggExpression.aggregateFunction match { - case GpuCount(_) => Aggregation.count() - case GpuSum(_) => Aggregation.sum() - case GpuMin(_) => Aggregation.min() - case GpuMax(_) => Aggregation.max() - case anythingElse => - throw new UnsupportedOperationException( - s"Unsupported aggregation: ${anythingElse.prettyName}") - } - case anythingElse => - throw new UnsupportedOperationException( - s"Unsupported window aggregation: ${anythingElse.prettyName}") - } - agg.onColumn(aggColumnIndex).overWindow(windowOption) + windowOptionBuilder.build() } def getBoundaryValue(boundary : Expression) : Int = boundary match { @@ -529,10 +548,9 @@ class GpuSpecifiedWindowFrameMeta( willNotWorkOnGpu(s"Literal Lower-bound of ROWS window-frame must be of INT type. " + s"Found ${literal.dataType}") } - else if (literal.value.asInstanceOf[Int] > 0) { - willNotWorkOnGpu(s"Lower-bounds ahead of current row is not supported. " + - s"Found ${literal.value}") - } + // We don't support a lower bound > 0 except for lead/lag where it is required + // That check is done in GpuWindowExpressionMeta where it knows what type of operation + // is being done case UnboundedPreceding => case CurrentRow => case _ => @@ -547,17 +565,15 @@ class GpuSpecifiedWindowFrameMeta( willNotWorkOnGpu(s"Literal Upper-bound of ROWS window-frame must be of INT type. " + s"Found ${literal.dataType}") } - else if (literal.value.asInstanceOf[Int] < 0) { - willNotWorkOnGpu(s"Upper-bounds behind of current row is not supported. " + - s"Found ${literal.value}") - } + // We don't support a upper bound < 0 except for lead/lag where it is required + // That check is done in GpuWindowExpressionMeta where it knows what type of operation + // is being done case UnboundedFollowing => case CurrentRow => case _ => willNotWorkOnGpu(s"Upper-bound of ROWS window-frame must be an INT literal," + s"UNBOUNDED FOLLOWING, or CURRENT ROW. " + s"Found unexpected bound: ${windowFrame.upper.prettyName}") } - } } @@ -613,8 +629,8 @@ case class GpuSpecifiedWindowFrame( case (l: GpuExpression, u: GpuExpression) if !isValidFrameBoundary(l, u) => TypeCheckFailure(s"Window frame upper bound '$upper' does not follow the lower bound " + s"'$lower'.") - case (l: GpuSpecialFrameBoundary, _) => TypeCheckSuccess - case (_, u: GpuSpecialFrameBoundary) => TypeCheckSuccess + case (_: GpuSpecialFrameBoundary, _) => TypeCheckSuccess + case (_, _: GpuSpecialFrameBoundary) => TypeCheckSuccess case (l: GpuExpression, u: GpuExpression) if l.dataType != u.dataType => TypeCheckFailure( s"Window frame bounds '$lower' and '$upper' do no not have the same data type: " + @@ -706,11 +722,26 @@ case class GpuSpecialFrameBoundary(boundary : SpecialFrameBoundary) } } -// GPU Counterpart of AggregateWindowFunction. -// All windowing specific functions are expected to extend from this. -trait GpuAggregateWindowFunction extends GpuDeclarativeAggregate with GpuUnevaluable { - override lazy val mergeExpressions: Seq[GpuExpression] - = throw new UnsupportedOperationException("Window Functions do not support merging.") +/** + * GPU Counterpart of `AggregateWindowFunction`. + * On the CPU this would extend `DeclarativeAggregate` and use the provided methods + * to build up the expressions need to produce a result. For window operations we do it + * in a single pass, where all of the data is available so instead we have out own set of + * expressions. + */ +trait GpuAggregateWindowFunction extends GpuUnevaluable { + /** + * Using child references, define the shape of the vectors sent to the window operations + */ + val windowInputProjection: Seq[Expression] + + /** + * Create the aggregation operation to perform for Windowing. The input to this method + * is a sequence of (index, ColumnVector) that corresponds one to one with what was + * returned by [[windowInputProjection]]. The index is the index into the Table for the + * corresponding ColumnVector. Some aggregations need extra values. + */ + def windowAggregation(inputs: Seq[(ColumnVector, Int)]): AggregationOnColumn } case class GpuRowNumber() extends GpuAggregateWindowFunction { @@ -718,14 +749,93 @@ case class GpuRowNumber() extends GpuAggregateWindowFunction { override def dataType: DataType = IntegerType override def children: Seq[Expression] = Nil - protected val zero: GpuLiteral = GpuLiteral(0, IntegerType) - protected val one : GpuLiteral = GpuLiteral(1, IntegerType) - - protected val rowNumber : AttributeReference = - AttributeReference("rowNumber", IntegerType)() - override def aggBufferAttributes: Seq[AttributeReference] = rowNumber :: Nil - override val initialValues: Seq[GpuExpression] = zero :: Nil - override val updateExpressions: Seq[Expression] = rowNumber :: one :: Nil - override val evaluateExpression: Expression = rowNumber - override val inputProjection: Seq[GpuExpression] = Nil + + override val windowInputProjection: Seq[Expression] = Nil + override def windowAggregation(inputs: Seq[(ColumnVector, Int)]): AggregationOnColumn = { + assert(inputs.isEmpty, inputs) + Aggregation.rowNumber().onColumn(0) + } +} + +abstract class OffsetWindowFunctionMeta[INPUT <: OffsetWindowFunction] ( + expr: INPUT, + conf: RapidsConf, + parent: Option[RapidsMeta[_, _, _]], + rule: ConfKeysAndIncompat) + extends ExprMeta[INPUT](expr, conf, parent, rule) { + + val input: BaseExprMeta[_] = GpuOverrides.wrapExpr(expr.input, conf, Some(this)) + val offset: BaseExprMeta[_] = GpuOverrides.wrapExpr(expr.offset, conf, Some(this)) + val default: BaseExprMeta[_] = GpuOverrides.wrapExpr(expr.default, conf, Some(this)) + override val childExprs: Seq[BaseExprMeta[_]] = + if (expr.default.dataType == NullType) { + // We don't support NullType, except for this one case... + Seq(input, offset) + } else { + Seq(input, offset, default) + } + + override def tagExprForGpu(): Unit = { + expr.input.dataType match { + case StringType => willNotWorkOnGpu("Strings are not currently supported as input") + case _ => // Good + } + } +} + +case class GpuLead(input: Expression, offset: Expression, default: Expression) + extends GpuAggregateWindowFunction { + private val parsedOffset = offset match { + case GpuLiteral(o: Int, IntegerType) => o + case other => + throw new IllegalStateException(s"$other is not a supported offset type") + } + override def nullable: Boolean = default == null || default.nullable || input.nullable + override def dataType: DataType = input.dataType + + override def children: Seq[Expression] = Seq(input, offset, default) + + override val windowInputProjection: Seq[Expression] = default match { + case GpuLiteral(v, _) if v == null => Seq(input) + case _ => Seq(input, default) + } + + override def windowAggregation(inputs: Seq[(ColumnVector, Int)]): AggregationOnColumn = { + val in = inputs.toArray + if (in.length > 1) { + // Has a default + Aggregation.lead(parsedOffset, in(1)._1).onColumn(in.head._2) + } else { + Aggregation.lead(parsedOffset).onColumn(in.head._2) + } + } +} + +case class GpuLag(input: Expression, offset: Expression, default: Expression) + extends GpuAggregateWindowFunction { + // TODO a lot of this should be common + private val parsedOffset = offset match { + case GpuLiteral(o: Int, IntegerType) => o + case other => + throw new IllegalStateException(s"$other is not a supported offset type") + } + override def nullable: Boolean = default == null || default.nullable || input.nullable + override def dataType: DataType = input.dataType + + override def children: Seq[Expression] = Seq(input, offset, default) + + override val windowInputProjection: Seq[Expression] = default match { + case GpuLiteral(v, _) if v == null => Seq(input) + case _ => Seq(input, default) + } + + override def windowAggregation(inputs: Seq[(ColumnVector, Int)]): AggregationOnColumn = { + val in = inputs.toArray + if (in.length > 1) { + // Has a default + Aggregation.lag(parsedOffset, in(1)._1).onColumn(in.head._2) + } else { + Aggregation.lag(parsedOffset).onColumn(in.head._2) + } + } } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala index 90f35380980..ec1db228972 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.rapids import ai.rapids.cudf -import ai.rapids.cudf.Aggregation +import ai.rapids.cudf.{Aggregation, AggregationOnColumn, ColumnVector} import com.nvidia.spark.rapids._ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult @@ -277,7 +277,8 @@ abstract class GpuDeclarativeAggregate extends GpuAggregateFunction with GpuUnev aggBufferAttributes.map(_.newInstance()) } -case class GpuMin(child: Expression) extends GpuDeclarativeAggregate { +case class GpuMin(child: Expression) extends GpuDeclarativeAggregate + with GpuAggregateWindowFunction { private lazy val cudfMin = AttributeReference("min", child.dataType)() override lazy val inputProjection: Seq[Expression] = Seq(child) @@ -295,9 +296,15 @@ case class GpuMin(child: Expression) extends GpuDeclarativeAggregate { override def children: Seq[Expression] = child :: Nil override def checkInputDataTypes(): TypeCheckResult = TypeUtils.checkForOrderingExpr(child.dataType, "function gpu min") + + // WINDOW FUNCTION + override lazy val windowInputProjection: Seq[Expression] = inputProjection + override def windowAggregation(inputs: Seq[(ColumnVector, Int)]): AggregationOnColumn = + Aggregation.min().onColumn(inputs.head._2) } -case class GpuMax(child: Expression) extends GpuDeclarativeAggregate { +case class GpuMax(child: Expression) extends GpuDeclarativeAggregate + with GpuAggregateWindowFunction { private lazy val cudfMax = AttributeReference("max", child.dataType)() override lazy val inputProjection: Seq[Expression] = Seq(child) @@ -315,10 +322,15 @@ case class GpuMax(child: Expression) extends GpuDeclarativeAggregate { override def children: Seq[Expression] = child :: Nil override def checkInputDataTypes(): TypeCheckResult = TypeUtils.checkForOrderingExpr(child.dataType, "function gpu max") + + // WINDOW FUNCTION + override lazy val windowInputProjection: Seq[Expression] = inputProjection + override def windowAggregation(inputs: Seq[(ColumnVector, Int)]): AggregationOnColumn = + Aggregation.max().onColumn(inputs.head._2) } case class GpuSum(child: Expression) - extends GpuDeclarativeAggregate with ImplicitCastInputTypes { + extends GpuDeclarativeAggregate with ImplicitCastInputTypes with GpuAggregateWindowFunction { private lazy val resultType = child.dataType match { case _: DoubleType => DoubleType case _ => LongType @@ -342,9 +354,15 @@ case class GpuSum(child: Expression) override def inputTypes: Seq[AbstractDataType] = Seq(NumericType) override def checkInputDataTypes(): TypeCheckResult = TypeUtils.checkForNumericExpr(child.dataType, "function gpu sum") + + // WINDOW FUNCTION + override lazy val windowInputProjection: Seq[Expression] = inputProjection + override def windowAggregation(inputs: Seq[(ColumnVector, Int)]): AggregationOnColumn = + Aggregation.sum().onColumn(inputs.head._2) } -case class GpuCount(children: Seq[Expression]) extends GpuDeclarativeAggregate { +case class GpuCount(children: Seq[Expression]) extends GpuDeclarativeAggregate + with GpuAggregateWindowFunction { // counts are Long private lazy val cudfCount = AttributeReference("count", LongType)() @@ -360,6 +378,13 @@ case class GpuCount(children: Seq[Expression]) extends GpuDeclarativeAggregate { // Copied from Count override def nullable: Boolean = false override def dataType: DataType = LongType + + // WINDOW FUNCTION + // countDistinct is not supported for window functions in spark right now. + // we could support it by doing an `Aggregation.nunique(false)` + override lazy val windowInputProjection: Seq[Expression] = inputProjection + override def windowAggregation(inputs: Seq[(ColumnVector, Int)]): AggregationOnColumn = + Aggregation.count(false).onColumn(inputs.head._2) } case class GpuAverage(child: Expression) extends GpuDeclarativeAggregate { diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/WindowFunctionSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/WindowFunctionSuite.scala index 0d4d102997d..9556aa047fc 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/WindowFunctionSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/WindowFunctionSuite.scala @@ -39,6 +39,12 @@ class WindowFunctionSuite extends SparkQueryCompareTestSuite { count("*").over(windowSpec) ) + testSparkResultsAreEqual("[Window] [ROWS/RANGE] [default] ", windowTestDfOrc) { + val rowsWindow = Window.partitionBy("uid") + .orderBy("dateLong") + windowAggregationTester(rowsWindow) + } + testSparkResultsAreEqual("[Window] [ROWS] [-2, 3] ", windowTestDfOrc) { val rowsWindow = Window.partitionBy("uid") .orderBy("dateLong") From ddc01819bc1f1a9604c7da4e28389c8657b4a36b Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Thu, 8 Oct 2020 09:56:23 -0500 Subject: [PATCH 2/4] Cleaned up comments and TODOs --- .../src/main/python/window_function_test.py | 18 +++++---- .../spark/rapids/GpuWindowExpression.scala | 39 ++++++++----------- 2 files changed, 28 insertions(+), 29 deletions(-) diff --git a/integration_tests/src/main/python/window_function_test.py b/integration_tests/src/main/python/window_function_test.py index 294a6747fda..7203cc3c080 100644 --- a/integration_tests/src/main/python/window_function_test.py +++ b/integration_tests/src/main/python/window_function_test.py @@ -89,10 +89,12 @@ def test_multi_types_window_aggs_for_rows_lead_lag(a_gen, b_gen, c_gen): ('a', RepeatSeqGen(a_gen, length=20)), ('b', b_gen), ('c', c_gen)] - # TODO range based appears to be the default without rowsBetween - # and fails horribly even when there is no rangeBetween - # ordering needs to include c because with nulls and especially on booleans - # it is possible to get a different ordering when it is ambiguous + # By default for many operations a range of unbounded to unbounded is used + # This will not work until https://github.com/NVIDIA/spark-rapids/issues/216 + # is fixed. + + # Ordering needs to include c because with nulls and especially on booleans + # it is possible to get a different ordering when it is ambiguous. baseWindowSpec = Window.partitionBy('a').orderBy('b', 'c') inclusiveWindowSpec = baseWindowSpec.rowsBetween(-10, 100) @@ -124,9 +126,11 @@ def test_multi_types_window_aggs_for_rows(a_gen, b_gen, c_gen): ('a', RepeatSeqGen(a_gen, length=20)), ('b', b_gen), ('c', c_gen)] - # TODO range based appears to be the default without rowsBetween - # and fails horribly even when there is no rangeBetween - # ordering needs to include c because with nulls and especially on booleans + # By default for many operations a range of unbounded to unbounded is used + # This will not work until https://github.com/NVIDIA/spark-rapids/issues/216 + # is fixed. + + # Ordering needs to include c because with nulls and especially on booleans # it is possible to get a different ordering when it is ambiguous baseWindowSpec = Window.partitionBy('a').orderBy('b', 'c') inclusiveWindowSpec = baseWindowSpec.rowsBetween(-10, 100) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala index 47a1a6675c0..c484321c8eb 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala @@ -76,8 +76,9 @@ class GpuWindowExpressionMeta( aggregateExpression.aggregateFunction match { // Count does not work in these cases because of a bug in cudf where a rolling count // does not do the correct thing for null entries - // TODO link to issue. - // Once that is fixed this can be deleted and the check will fall through. + // Once https://github.com/rapidsai/cudf/issues/6343 + // is fixed this can be deleted and the check will go to the next case + // where it will match and pass. case Count(exp) => if (!exp.forall(x => x.isInstanceOf[Literal])) { willNotWorkOnGpu(s"Currently, only COUNT(1) and COUNT(*) are supported. " + @@ -359,7 +360,9 @@ object GpuWindowExpression { .window(lower, upper) .timestampColumnIndex(timeColumnIndex) - // TODO should we check this out? + // We only support a single time based column to order by right now, so just verify + // that it is correct. + assert(orderSpec.length == 1) if (orderSpec.head.isAscending) { windowOptionBuilder.timestampAscending() } else { @@ -783,9 +786,12 @@ abstract class OffsetWindowFunctionMeta[INPUT <: OffsetWindowFunction] ( } } -case class GpuLead(input: Expression, offset: Expression, default: Expression) - extends GpuAggregateWindowFunction { - private val parsedOffset = offset match { +trait GpuOffsetWindowFunction extends GpuAggregateWindowFunction { + protected val input: Expression + protected val offset: Expression + protected val default: Expression + + protected val parsedOffset: Int = offset match { case GpuLiteral(o: Int, IntegerType) => o case other => throw new IllegalStateException(s"$other is not a supported offset type") @@ -799,6 +805,10 @@ case class GpuLead(input: Expression, offset: Expression, default: Expression) case GpuLiteral(v, _) if v == null => Seq(input) case _ => Seq(input, default) } +} + +case class GpuLead(input: Expression, offset: Expression, default: Expression) + extends GpuOffsetWindowFunction { override def windowAggregation(inputs: Seq[(ColumnVector, Int)]): AggregationOnColumn = { val in = inputs.toArray @@ -812,22 +822,7 @@ case class GpuLead(input: Expression, offset: Expression, default: Expression) } case class GpuLag(input: Expression, offset: Expression, default: Expression) - extends GpuAggregateWindowFunction { - // TODO a lot of this should be common - private val parsedOffset = offset match { - case GpuLiteral(o: Int, IntegerType) => o - case other => - throw new IllegalStateException(s"$other is not a supported offset type") - } - override def nullable: Boolean = default == null || default.nullable || input.nullable - override def dataType: DataType = input.dataType - - override def children: Seq[Expression] = Seq(input, offset, default) - - override val windowInputProjection: Seq[Expression] = default match { - case GpuLiteral(v, _) if v == null => Seq(input) - case _ => Seq(input, default) - } + extends GpuOffsetWindowFunction { override def windowAggregation(inputs: Seq[(ColumnVector, Int)]): AggregationOnColumn = { val in = inputs.toArray From 49b630650feaac1fe769376086caeb3e903bb847 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Thu, 8 Oct 2020 10:41:23 -0500 Subject: [PATCH 3/4] Addressed review comments --- integration_tests/src/main/python/window_function_test.py | 2 +- .../scala/com/nvidia/spark/rapids/GpuWindowExpression.scala | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/integration_tests/src/main/python/window_function_test.py b/integration_tests/src/main/python/window_function_test.py index 7203cc3c080..6b8a4d4b4d0 100644 --- a/integration_tests/src/main/python/window_function_test.py +++ b/integration_tests/src/main/python/window_function_test.py @@ -114,7 +114,7 @@ def do_it(spark): .withColumn('row_num', f.row_number().over(baseWindowSpec)) assert_gpu_and_cpu_are_equal_collect(do_it, conf={'spark.rapids.sql.hasNans': 'false'}) -# lead and lag don't currently work for strig columns, so redo the tests, but just for strings +# lead and lag don't currently work for string columns, so redo the tests, but just for strings # without lead and lag @ignore_order @approximate_float diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala index c484321c8eb..628a853d349 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala @@ -84,7 +84,8 @@ class GpuWindowExpressionMeta( willNotWorkOnGpu(s"Currently, only COUNT(1) and COUNT(*) are supported. " + s"COUNT($exp) is not supported in windowing.") } - // Sadly we have to white list the aggregations because they do not all work + // Sadly not all aggregations work for window operations yet, os explicitly allow the + // ones that do work. case Count(_) | Sum(_) | Min(_) | Max(_) => // Supported. case other: AggregateFunction => willNotWorkOnGpu(s"AggregateFunction ${other.prettyName} " + From fbc0ff5c5d0a42bbf6156bcc81bddce9dba02dd6 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Thu, 8 Oct 2020 11:10:57 -0500 Subject: [PATCH 4/4] Update sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala Co-authored-by: Jason Lowe --- .../scala/com/nvidia/spark/rapids/GpuWindowExpression.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala index 628a853d349..70757265ecc 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala @@ -84,7 +84,7 @@ class GpuWindowExpressionMeta( willNotWorkOnGpu(s"Currently, only COUNT(1) and COUNT(*) are supported. " + s"COUNT($exp) is not supported in windowing.") } - // Sadly not all aggregations work for window operations yet, os explicitly allow the + // Sadly not all aggregations work for window operations yet, so explicitly allow the // ones that do work. case Count(_) | Sum(_) | Min(_) | Max(_) => // Supported. case other: AggregateFunction =>