Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add in unbounded to unbounded optimization for min/max #9228

Merged
merged 1 commit into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions integration_tests/src/main/python/window_function_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,42 @@ def test_range_windows_with_string_order_by_column(data_gen, batch_size):
' FROM window_agg_table ',
conf={'spark.rapids.sql.batchSizeBytes': batch_size})

# This is for aggregations that work with the optimized unbounded to unbounded window optimization.
# They don't need to be batched specially, but it only works if all of the aggregations can support this.
# the order returned should be consistent because the data ends up in a single task (no partitioning)
@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches
@pytest.mark.parametrize('b_gen', all_basic_gens + [decimal_gen_32bit, decimal_gen_128bit], ids=meta_idfn('data:'))
def test_window_batched_unbounded_no_part(b_gen, batch_size):
conf = {'spark.rapids.sql.batchSizeBytes': batch_size,
'spark.rapids.sql.castFloatToDecimal.enabled': True}
query_parts = ['min(b) over (order by a rows between UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as min_col',
'max(b) over (order by a rows between UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as max_col']
Comment on lines +455 to +456
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a neat way of phrasing the query. I'll note this for next time.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I stole it from test_window_running_no_part just below this on line 492


assert_gpu_and_cpu_are_equal_sql(
lambda spark : two_col_df(spark, UniqueLongGen(), b_gen, length=1024 * 14),
"window_agg_table",
'select ' +
', '.join(query_parts) +
' from window_agg_table ',
validate_execs_in_gpu_plan = ['GpuCachedDoublePassWindowExec'],
conf = conf)

@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches
@pytest.mark.parametrize('b_gen', all_basic_gens + [decimal_gen_32bit, decimal_gen_128bit], ids=meta_idfn('data:'))
def test_window_batched_unbounded(b_gen, batch_size):
conf = {'spark.rapids.sql.batchSizeBytes': batch_size,
'spark.rapids.sql.castFloatToDecimal.enabled': True}
query_parts = ['min(b) over (order by a rows between UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as min_col',
'max(b) over (partition by a % 2 order by a rows between UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as max_col']

assert_gpu_and_cpu_are_equal_sql(
lambda spark : two_col_df(spark, UniqueLongGen(), b_gen, length=1024 * 14),
"window_agg_table",
'select ' +
', '.join(query_parts) +
' from window_agg_table ',
validate_execs_in_gpu_plan = ['GpuCachedDoublePassWindowExec'],
conf = conf)

# This is for aggregations that work with a running window optimization. They don't need to be batched
# specially, but it only works if all of the aggregations can support this.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -994,6 +994,55 @@ class CountUnboundedToUnboundedFixer(errorOnOverflow: Boolean)
}
}

class BatchedUnboundedToUnboundedBinaryFixer(val binOp: BinaryOp, val dataType: DataType)
extends BatchedUnboundedToUnboundedWindowFixer {
private var previousResult: Option[Scalar] = None

override def updateState(scalar: Scalar): Unit = previousResult match {
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
case None =>
previousResult = Some(scalar.incRefCount())
case Some(prev) =>
// This is ugly, but for now it is simple to make it work
val result = withResource(ColumnVector.fromScalar(prev, 1)) { p1 =>
withResource(p1.binaryOp(binOp, scalar, prev.getType)) { result1 =>
result1.getScalarElement(0)
}
}
closeOnExcept(result) { _ =>
previousResult.foreach(_.close)
previousResult = Some(result)
}
}

override def fixUp(samePartitionMask: Either[ColumnVector, Boolean],
column: ColumnVector): ColumnVector = {
val scalar = previousResult match {
case Some(value) =>
value.incRefCount()
case None =>
GpuScalar.from(null, dataType)
}

withResource(scalar) { scalar =>
samePartitionMask match {
case scala.Left(cv) =>
cv.ifElse(scalar, column)
case scala.Right(true) =>
ColumnVector.fromScalar(scalar, column.getRowCount.toInt)
case _ =>
column.incRefCount()
}
}
}

override def close(): Unit = reset()

override def reset(): Unit = {
previousResult.foreach(_.close())
previousResult = None
}
}

/**
* This class fixes up batched running windows by performing a binary op on the previous value and
* those in the the same partition by key group. It does not deal with nulls, so it works for things
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,7 @@ object GpuMin{

abstract class GpuMin(child: Expression) extends GpuAggregateFunction
with GpuBatchedRunningWindowWithFixer
with GpuUnboundToUnboundWindowWithFixer
with GpuAggregateWindowFunction
with GpuRunningWindowFunction
with Serializable {
Expand Down Expand Up @@ -602,6 +603,10 @@ abstract class GpuMin(child: Expression) extends GpuAggregateFunction
override def newFixer(): BatchedRunningWindowFixer =
new BatchedRunningWindowBinaryFixer(BinaryOp.NULL_MIN, "min")

// UNBOUNDED TO UNBOUNDED WINDOW
override def newUnboundedToUnboundedFixer: BatchedUnboundedToUnboundedWindowFixer =
new BatchedUnboundedToUnboundedBinaryFixer(BinaryOp.NULL_MIN, dataType)

override def groupByScanInputProjection(isRunningBatched: Boolean): Seq[Expression] =
inputProjection

Expand Down Expand Up @@ -752,6 +757,7 @@ object GpuMax {

abstract class GpuMax(child: Expression) extends GpuAggregateFunction
with GpuBatchedRunningWindowWithFixer
with GpuUnboundToUnboundWindowWithFixer
with GpuAggregateWindowFunction
with GpuRunningWindowFunction
with Serializable {
Expand Down Expand Up @@ -781,6 +787,10 @@ abstract class GpuMax(child: Expression) extends GpuAggregateFunction
override def newFixer(): BatchedRunningWindowFixer =
new BatchedRunningWindowBinaryFixer(BinaryOp.NULL_MAX, "max")

// UNBOUNDED TO UNBOUNDED WINDOW
override def newUnboundedToUnboundedFixer: BatchedUnboundedToUnboundedWindowFixer =
new BatchedUnboundedToUnboundedBinaryFixer(BinaryOp.NULL_MAX, dataType)

override def groupByScanInputProjection(isRunningBatched: Boolean): Seq[Expression] =
inputProjection

Expand Down