Skip to content

Commit

Permalink
Add in unbounded to unbounded optimization for min/max (#9228)
Browse files Browse the repository at this point in the history
Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
  • Loading branch information
revans2 authored Sep 19, 2023
1 parent 8dfe2a1 commit d7f58ea
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 0 deletions.
36 changes: 36 additions & 0 deletions integration_tests/src/main/python/window_function_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,42 @@ def test_range_windows_with_string_order_by_column(data_gen, batch_size):
' FROM window_agg_table ',
conf={'spark.rapids.sql.batchSizeBytes': batch_size})

# This is for aggregations that work with the optimized unbounded to unbounded window optimization.
# They don't need to be batched specially, but it only works if all of the aggregations can support this.
# the order returned should be consistent because the data ends up in a single task (no partitioning)
@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches
@pytest.mark.parametrize('b_gen', all_basic_gens + [decimal_gen_32bit, decimal_gen_128bit], ids=meta_idfn('data:'))
def test_window_batched_unbounded_no_part(b_gen, batch_size):
conf = {'spark.rapids.sql.batchSizeBytes': batch_size,
'spark.rapids.sql.castFloatToDecimal.enabled': True}
query_parts = ['min(b) over (order by a rows between UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as min_col',
'max(b) over (order by a rows between UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as max_col']

assert_gpu_and_cpu_are_equal_sql(
lambda spark : two_col_df(spark, UniqueLongGen(), b_gen, length=1024 * 14),
"window_agg_table",
'select ' +
', '.join(query_parts) +
' from window_agg_table ',
validate_execs_in_gpu_plan = ['GpuCachedDoublePassWindowExec'],
conf = conf)

@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches
@pytest.mark.parametrize('b_gen', all_basic_gens + [decimal_gen_32bit, decimal_gen_128bit], ids=meta_idfn('data:'))
def test_window_batched_unbounded(b_gen, batch_size):
conf = {'spark.rapids.sql.batchSizeBytes': batch_size,
'spark.rapids.sql.castFloatToDecimal.enabled': True}
query_parts = ['min(b) over (order by a rows between UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as min_col',
'max(b) over (partition by a % 2 order by a rows between UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as max_col']

assert_gpu_and_cpu_are_equal_sql(
lambda spark : two_col_df(spark, UniqueLongGen(), b_gen, length=1024 * 14),
"window_agg_table",
'select ' +
', '.join(query_parts) +
' from window_agg_table ',
validate_execs_in_gpu_plan = ['GpuCachedDoublePassWindowExec'],
conf = conf)

# This is for aggregations that work with a running window optimization. They don't need to be batched
# specially, but it only works if all of the aggregations can support this.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -994,6 +994,55 @@ class CountUnboundedToUnboundedFixer(errorOnOverflow: Boolean)
}
}

class BatchedUnboundedToUnboundedBinaryFixer(val binOp: BinaryOp, val dataType: DataType)
extends BatchedUnboundedToUnboundedWindowFixer {
private var previousResult: Option[Scalar] = None

override def updateState(scalar: Scalar): Unit = previousResult match {
case None =>
previousResult = Some(scalar.incRefCount())
case Some(prev) =>
// This is ugly, but for now it is simple to make it work
val result = withResource(ColumnVector.fromScalar(prev, 1)) { p1 =>
withResource(p1.binaryOp(binOp, scalar, prev.getType)) { result1 =>
result1.getScalarElement(0)
}
}
closeOnExcept(result) { _ =>
previousResult.foreach(_.close)
previousResult = Some(result)
}
}

override def fixUp(samePartitionMask: Either[ColumnVector, Boolean],
column: ColumnVector): ColumnVector = {
val scalar = previousResult match {
case Some(value) =>
value.incRefCount()
case None =>
GpuScalar.from(null, dataType)
}

withResource(scalar) { scalar =>
samePartitionMask match {
case scala.Left(cv) =>
cv.ifElse(scalar, column)
case scala.Right(true) =>
ColumnVector.fromScalar(scalar, column.getRowCount.toInt)
case _ =>
column.incRefCount()
}
}
}

override def close(): Unit = reset()

override def reset(): Unit = {
previousResult.foreach(_.close())
previousResult = None
}
}

/**
* This class fixes up batched running windows by performing a binary op on the previous value and
* those in the the same partition by key group. It does not deal with nulls, so it works for things
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,7 @@ object GpuMin{

abstract class GpuMin(child: Expression) extends GpuAggregateFunction
with GpuBatchedRunningWindowWithFixer
with GpuUnboundToUnboundWindowWithFixer
with GpuAggregateWindowFunction
with GpuRunningWindowFunction
with Serializable {
Expand Down Expand Up @@ -602,6 +603,10 @@ abstract class GpuMin(child: Expression) extends GpuAggregateFunction
override def newFixer(): BatchedRunningWindowFixer =
new BatchedRunningWindowBinaryFixer(BinaryOp.NULL_MIN, "min")

// UNBOUNDED TO UNBOUNDED WINDOW
override def newUnboundedToUnboundedFixer: BatchedUnboundedToUnboundedWindowFixer =
new BatchedUnboundedToUnboundedBinaryFixer(BinaryOp.NULL_MIN, dataType)

override def groupByScanInputProjection(isRunningBatched: Boolean): Seq[Expression] =
inputProjection

Expand Down Expand Up @@ -752,6 +757,7 @@ object GpuMax {

abstract class GpuMax(child: Expression) extends GpuAggregateFunction
with GpuBatchedRunningWindowWithFixer
with GpuUnboundToUnboundWindowWithFixer
with GpuAggregateWindowFunction
with GpuRunningWindowFunction
with Serializable {
Expand Down Expand Up @@ -781,6 +787,10 @@ abstract class GpuMax(child: Expression) extends GpuAggregateFunction
override def newFixer(): BatchedRunningWindowFixer =
new BatchedRunningWindowBinaryFixer(BinaryOp.NULL_MAX, "max")

// UNBOUNDED TO UNBOUNDED WINDOW
override def newUnboundedToUnboundedFixer: BatchedUnboundedToUnboundedWindowFixer =
new BatchedUnboundedToUnboundedBinaryFixer(BinaryOp.NULL_MAX, dataType)

override def groupByScanInputProjection(isRunningBatched: Boolean): Seq[Expression] =
inputProjection

Expand Down

0 comments on commit d7f58ea

Please sign in to comment.