From 91b5d30dce78b40542082e88f229583316e29f43 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Tue, 12 Sep 2023 16:04:02 -0500 Subject: [PATCH] Add in unbounded to unbounded optimization for min/max Signed-off-by: Robert (Bobby) Evans --- .../src/main/python/window_function_test.py | 36 ++++++++++++++ .../spark/rapids/GpuWindowExpression.scala | 49 +++++++++++++++++++ .../spark/sql/rapids/AggregateFunctions.scala | 10 ++++ 3 files changed, 95 insertions(+) diff --git a/integration_tests/src/main/python/window_function_test.py b/integration_tests/src/main/python/window_function_test.py index 07ee61cd1a3..6c81fcb804e 100644 --- a/integration_tests/src/main/python/window_function_test.py +++ b/integration_tests/src/main/python/window_function_test.py @@ -444,6 +444,42 @@ def test_range_windows_with_string_order_by_column(data_gen, batch_size): ' FROM window_agg_table ', conf={'spark.rapids.sql.batchSizeBytes': batch_size}) +# This is for aggregations that work with the optimized unbounded to unbounded window optimization. +# They don't need to be batched specially, but it only works if all of the aggregations can support this. +# the order returned should be consistent because the data ends up in a single task (no partitioning) +@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches +@pytest.mark.parametrize('b_gen', all_basic_gens + [decimal_gen_32bit, decimal_gen_128bit], ids=meta_idfn('data:')) +def test_window_batched_unbounded_no_part(b_gen, batch_size): + conf = {'spark.rapids.sql.batchSizeBytes': batch_size, + 'spark.rapids.sql.castFloatToDecimal.enabled': True} + query_parts = ['min(b) over (order by a rows between UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as min_col', + 'max(b) over (order by a rows between UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as max_col'] + + assert_gpu_and_cpu_are_equal_sql( + lambda spark : two_col_df(spark, UniqueLongGen(), b_gen, length=1024 * 14), + "window_agg_table", + 'select ' + + ', '.join(query_parts) + + ' from window_agg_table ', + validate_execs_in_gpu_plan = ['GpuCachedDoublePassWindowExec'], + conf = conf) + +@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches +@pytest.mark.parametrize('b_gen', all_basic_gens + [decimal_gen_32bit, decimal_gen_128bit], ids=meta_idfn('data:')) +def test_window_batched_unbounded(b_gen, batch_size): + conf = {'spark.rapids.sql.batchSizeBytes': batch_size, + 'spark.rapids.sql.castFloatToDecimal.enabled': True} + query_parts = ['min(b) over (order by a rows between UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as min_col', + 'max(b) over (partition by a % 2 order by a rows between UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as max_col'] + + assert_gpu_and_cpu_are_equal_sql( + lambda spark : two_col_df(spark, UniqueLongGen(), b_gen, length=1024 * 14), + "window_agg_table", + 'select ' + + ', '.join(query_parts) + + ' from window_agg_table ', + validate_execs_in_gpu_plan = ['GpuCachedDoublePassWindowExec'], + conf = conf) # This is for aggregations that work with a running window optimization. They don't need to be batched # specially, but it only works if all of the aggregations can support this. diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala index 98e47d93f42..974647d1046 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExpression.scala @@ -994,6 +994,55 @@ class CountUnboundedToUnboundedFixer(errorOnOverflow: Boolean) } } +class BatchedUnboundedToUnboundedBinaryFixer(val binOp: BinaryOp, val dataType: DataType) + extends BatchedUnboundedToUnboundedWindowFixer { + private var previousResult: Option[Scalar] = None + + override def updateState(scalar: Scalar): Unit = previousResult match { + case None => + previousResult = Some(scalar.incRefCount()) + case Some(prev) => + // This is ugly, but for now it is simple to make it work + val result = withResource(ColumnVector.fromScalar(prev, 1)) { p1 => + withResource(p1.binaryOp(binOp, scalar, prev.getType)) { result1 => + result1.getScalarElement(0) + } + } + closeOnExcept(result) { _ => + previousResult.foreach(_.close) + previousResult = Some(result) + } + } + + override def fixUp(samePartitionMask: Either[ColumnVector, Boolean], + column: ColumnVector): ColumnVector = { + val scalar = previousResult match { + case Some(value) => + value.incRefCount() + case None => + GpuScalar.from(null, dataType) + } + + withResource(scalar) { scalar => + samePartitionMask match { + case scala.Left(cv) => + cv.ifElse(scalar, column) + case scala.Right(true) => + ColumnVector.fromScalar(scalar, column.getRowCount.toInt) + case _ => + column.incRefCount() + } + } + } + + override def close(): Unit = reset() + + override def reset(): Unit = { + previousResult.foreach(_.close()) + previousResult = None + } +} + /** * This class fixes up batched running windows by performing a binary op on the previous value and * those in the the same partition by key group. It does not deal with nulls, so it works for things diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala index bc3a5b3ea9e..78811fef08e 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala @@ -573,6 +573,7 @@ object GpuMin{ abstract class GpuMin(child: Expression) extends GpuAggregateFunction with GpuBatchedRunningWindowWithFixer + with GpuUnboundToUnboundWindowWithFixer with GpuAggregateWindowFunction with GpuRunningWindowFunction with Serializable { @@ -602,6 +603,10 @@ abstract class GpuMin(child: Expression) extends GpuAggregateFunction override def newFixer(): BatchedRunningWindowFixer = new BatchedRunningWindowBinaryFixer(BinaryOp.NULL_MIN, "min") + // UNBOUNDED TO UNBOUNDED WINDOW + override def newUnboundedToUnboundedFixer: BatchedUnboundedToUnboundedWindowFixer = + new BatchedUnboundedToUnboundedBinaryFixer(BinaryOp.NULL_MIN, dataType) + override def groupByScanInputProjection(isRunningBatched: Boolean): Seq[Expression] = inputProjection @@ -752,6 +757,7 @@ object GpuMax { abstract class GpuMax(child: Expression) extends GpuAggregateFunction with GpuBatchedRunningWindowWithFixer + with GpuUnboundToUnboundWindowWithFixer with GpuAggregateWindowFunction with GpuRunningWindowFunction with Serializable { @@ -781,6 +787,10 @@ abstract class GpuMax(child: Expression) extends GpuAggregateFunction override def newFixer(): BatchedRunningWindowFixer = new BatchedRunningWindowBinaryFixer(BinaryOp.NULL_MAX, "max") + // UNBOUNDED TO UNBOUNDED WINDOW + override def newUnboundedToUnboundedFixer: BatchedUnboundedToUnboundedWindowFixer = + new BatchedUnboundedToUnboundedBinaryFixer(BinaryOp.NULL_MAX, dataType) + override def groupByScanInputProjection(isRunningBatched: Boolean): Seq[Expression] = inputProjection