Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unshim GpuSumDefaults [databricks] #5010

Merged
merged 1 commit into from
Mar 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.util.{ArrayData, TypeUtils}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.aggregate.GpuSumDefaults
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch

Expand Down Expand Up @@ -915,59 +914,35 @@ abstract class GpuDecimalSum(
private lazy val zeroDec = GpuLiteral(Decimal(0, dt.precision, dt.scale), dt)

override lazy val initialValues: Seq[GpuLiteral] = {
if (GpuSumDefaults.hasIsEmptyField) {
Seq(zeroDec, GpuLiteral(true, BooleanType))
} else {
Seq(GpuLiteral(null, dt))
}
Seq(zeroDec, GpuLiteral(true, BooleanType))
}

// we need to cast to `resultType` here, since Spark is not widening types
// as done before Spark 3.2.0. See CudfSum for more info.
override lazy val inputProjection: Seq[Expression] = {
if (GpuSumDefaults.hasIsEmptyField) {
// Spark tracks null columns through a second column isEmpty for decimal. So null values
// are replaced with 0, and a separate boolean column for isNull is added
Seq(GpuIf(GpuIsNull(child), zeroDec, GpuCast(child, dt)), GpuIsNull(child))
} else {
Seq(GpuCast(child, dt))
}
// Spark tracks null columns through a second column isEmpty for decimal. So null values
// are replaced with 0, and a separate boolean column for isNull is added
Seq(GpuIf(GpuIsNull(child), zeroDec, GpuCast(child, dt)), GpuIsNull(child))
}

protected lazy val updateIsEmpty: CudfAggregate = new CudfMin(BooleanType)

override lazy val updateAggregates: Seq[CudfAggregate] = {
if (GpuSumDefaults.hasIsEmptyField) {
Seq(updateSum, updateIsEmpty)
} else {
Seq(updateSum)
}
Seq(updateSum, updateIsEmpty)
}

override lazy val postUpdate: Seq[Expression] = {
if (GpuSumDefaults.hasIsEmptyField) {
Seq(GpuCheckOverflow(updateSum.attr, dt, !failOnErrorOverride), updateIsEmpty.attr)
} else {
postUpdateAttr
}
Seq(GpuCheckOverflow(updateSum.attr, dt, !failOnErrorOverride), updateIsEmpty.attr)
}

// Used for Decimal overflow detection
protected lazy val isEmpty: AttributeReference = AttributeReference("isEmpty", BooleanType)()
override lazy val aggBufferAttributes: Seq[AttributeReference] = {
if (GpuSumDefaults.hasIsEmptyField) {
Seq(sum, isEmpty)
} else {
Seq(sum)
}
Seq(sum, isEmpty)
}

override lazy val preMerge: Seq[Expression] = {
if (GpuSumDefaults.hasIsEmptyField) {
Seq(sum, isEmpty, GpuIsNull(sum))
} else {
aggBufferAttributes
}
Seq(sum, isEmpty, GpuIsNull(sum))
}

protected lazy val mergeIsEmpty: CudfAggregate = new CudfMin(BooleanType)
Expand All @@ -977,32 +952,20 @@ abstract class GpuDecimalSum(
// Cudf does not have such an aggregation, so for merge we have to work around that similar to
// what happens with isEmpty
override lazy val mergeAggregates: Seq[CudfAggregate] = {
if (GpuSumDefaults.hasIsEmptyField) {
Seq(mergeSum, mergeIsEmpty, mergeIsOverflow)
} else {
Seq(mergeSum)
}
Seq(mergeSum, mergeIsEmpty, mergeIsOverflow)
}

override lazy val postMerge: Seq[Expression] = {
if (GpuSumDefaults.hasIsEmptyField) {
Seq(
GpuCheckOverflow(GpuIf(mergeIsOverflow.attr,
GpuLiteral.create(null, dt),
mergeSum.attr),
dt, !failOnErrorOverride),
mergeIsEmpty.attr)
} else {
postMergeAttr
}
Seq(
GpuCheckOverflow(GpuIf(mergeIsOverflow.attr,
GpuLiteral.create(null, dt),
mergeSum.attr),
dt, !failOnErrorOverride),
mergeIsEmpty.attr)
}

override lazy val evaluateExpression: Expression = {
if (GpuSumDefaults.hasIsEmptyField) {
GpuCheckOverflowAfterSum(sum, isEmpty, dt, !failOnErrorOverride)
} else {
GpuCheckOverflow(sum, dt, !failOnErrorOverride)
}
GpuCheckOverflowAfterSum(sum, isEmpty, dt, !failOnErrorOverride)
}

override def windowOutput(result: ColumnVector): ColumnVector = {
Expand Down Expand Up @@ -1077,47 +1040,29 @@ case class GpuDecimal128Sum(

override lazy val inputProjection: Seq[Expression] = {
val chunks = (0 until 4).map {
GpuExtractChunk32(GpuCast(child, dt), _, GpuSumDefaults.hasIsEmptyField)
}
if (GpuSumDefaults.hasIsEmptyField) {
// Spark tracks null columns through a second column isEmpty for decimal. So null values
// are replaced with 0, and a separate boolean column for isNull is added
chunks :+ GpuIsNull(child)
} else {
chunks
GpuExtractChunk32(GpuCast(child, dt), _, replaceNullsWithZero = true)
}
// Spark tracks null columns through a second column isEmpty for decimal. So null values
// are replaced with 0, and a separate boolean column for isNull is added
chunks :+ GpuIsNull(child)
}

private lazy val updateSumChunks = (0 until 4).map(_ => new CudfSum(LongType))

override lazy val updateAggregates: Seq[CudfAggregate] = {
if (GpuSumDefaults.hasIsEmptyField) {
updateSumChunks :+ updateIsEmpty
} else {
updateSumChunks
}
}
override lazy val updateAggregates: Seq[CudfAggregate] = updateSumChunks :+ updateIsEmpty

override lazy val postUpdate: Seq[Expression] = {
val assembleExpr = GpuAssembleSumChunks(updateSumChunks.map(_.attr), dt, !failOnErrorOverride)
if (GpuSumDefaults.hasIsEmptyField) {
Seq(GpuCheckOverflow(assembleExpr, dt, !failOnErrorOverride), updateIsEmpty.attr)
} else {
Seq(assembleExpr)
}
Seq(GpuCheckOverflow(assembleExpr, dt, !failOnErrorOverride), updateIsEmpty.attr)
}

override lazy val preMerge: Seq[Expression] = {
val chunks = (0 until 4).map {
GpuExtractChunk32(sum, _, replaceNullsWithZero = false)
}
if (GpuSumDefaults.hasIsEmptyField) {
// Spark tracks null columns through a second column isEmpty for decimal. So null values
// are replaced with 0, and a separate boolean column for isNull is added
chunks ++ Seq(isEmpty, GpuIsNull(sum))
} else {
chunks
}
// Spark tracks null columns through a second column isEmpty for decimal. So null values
// are replaced with 0, and a separate boolean column for isNull is added
chunks ++ Seq(isEmpty, GpuIsNull(sum))
}

private lazy val mergeSumChunks = (0 until 4).map(_ => new CudfSum(LongType))
Expand All @@ -1126,25 +1071,17 @@ case class GpuDecimal128Sum(
// Cudf does not have such an aggregation, so for merge we have to work around that similar to
// what happens with isEmpty
override lazy val mergeAggregates: Seq[CudfAggregate] = {
if (GpuSumDefaults.hasIsEmptyField) {
mergeSumChunks ++ Seq(mergeIsEmpty, mergeIsOverflow)
} else {
mergeSumChunks
}
mergeSumChunks ++ Seq(mergeIsEmpty, mergeIsOverflow)
}

override lazy val postMerge: Seq[Expression] = {
val assembleExpr = GpuAssembleSumChunks(mergeSumChunks.map(_.attr), dt, !failOnErrorOverride)
if (GpuSumDefaults.hasIsEmptyField) {
Seq(
GpuCheckOverflow(GpuIf(mergeIsOverflow.attr,
GpuLiteral.create(null, dt),
assembleExpr),
dt, !failOnErrorOverride),
mergeIsEmpty.attr)
} else {
Seq(assembleExpr)
}
Seq(
GpuCheckOverflow(GpuIf(mergeIsOverflow.attr,
GpuLiteral.create(null, dt),
assembleExpr),
dt, !failOnErrorOverride),
mergeIsEmpty.attr)
}

// Replacement Window Function
Expand Down