diff --git a/sql-plugin/src/main/311+-all/scala/org/apache/spark/sql/rapids/aggregate/GpuSum.scala b/sql-plugin/src/main/311+-all/scala/org/apache/spark/sql/rapids/aggregate/GpuSum.scala deleted file mode 100644 index 057f588eddf..00000000000 --- a/sql-plugin/src/main/311+-all/scala/org/apache/spark/sql/rapids/aggregate/GpuSum.scala +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Copyright (c) 2021, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.rapids.aggregate - -object GpuSumDefaults { - val hasIsEmptyField: Boolean = true -} diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala index b7bf51b7caa..5bd4a8c3cbd 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala @@ -29,7 +29,6 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.util.{ArrayData, TypeUtils} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.rapids.aggregate.GpuSumDefaults import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnarBatch @@ -915,59 +914,35 @@ abstract class GpuDecimalSum( private lazy val zeroDec = GpuLiteral(Decimal(0, dt.precision, dt.scale), dt) override lazy val initialValues: Seq[GpuLiteral] = { - if (GpuSumDefaults.hasIsEmptyField) { - Seq(zeroDec, GpuLiteral(true, BooleanType)) - } else { - Seq(GpuLiteral(null, dt)) - } + Seq(zeroDec, GpuLiteral(true, BooleanType)) } // we need to cast to `resultType` here, since Spark is not widening types // as done before Spark 3.2.0. See CudfSum for more info. override lazy val inputProjection: Seq[Expression] = { - if (GpuSumDefaults.hasIsEmptyField) { - // Spark tracks null columns through a second column isEmpty for decimal. So null values - // are replaced with 0, and a separate boolean column for isNull is added - Seq(GpuIf(GpuIsNull(child), zeroDec, GpuCast(child, dt)), GpuIsNull(child)) - } else { - Seq(GpuCast(child, dt)) - } + // Spark tracks null columns through a second column isEmpty for decimal. So null values + // are replaced with 0, and a separate boolean column for isNull is added + Seq(GpuIf(GpuIsNull(child), zeroDec, GpuCast(child, dt)), GpuIsNull(child)) } protected lazy val updateIsEmpty: CudfAggregate = new CudfMin(BooleanType) override lazy val updateAggregates: Seq[CudfAggregate] = { - if (GpuSumDefaults.hasIsEmptyField) { - Seq(updateSum, updateIsEmpty) - } else { - Seq(updateSum) - } + Seq(updateSum, updateIsEmpty) } override lazy val postUpdate: Seq[Expression] = { - if (GpuSumDefaults.hasIsEmptyField) { - Seq(GpuCheckOverflow(updateSum.attr, dt, !failOnErrorOverride), updateIsEmpty.attr) - } else { - postUpdateAttr - } + Seq(GpuCheckOverflow(updateSum.attr, dt, !failOnErrorOverride), updateIsEmpty.attr) } // Used for Decimal overflow detection protected lazy val isEmpty: AttributeReference = AttributeReference("isEmpty", BooleanType)() override lazy val aggBufferAttributes: Seq[AttributeReference] = { - if (GpuSumDefaults.hasIsEmptyField) { - Seq(sum, isEmpty) - } else { - Seq(sum) - } + Seq(sum, isEmpty) } override lazy val preMerge: Seq[Expression] = { - if (GpuSumDefaults.hasIsEmptyField) { - Seq(sum, isEmpty, GpuIsNull(sum)) - } else { - aggBufferAttributes - } + Seq(sum, isEmpty, GpuIsNull(sum)) } protected lazy val mergeIsEmpty: CudfAggregate = new CudfMin(BooleanType) @@ -977,32 +952,20 @@ abstract class GpuDecimalSum( // Cudf does not have such an aggregation, so for merge we have to work around that similar to // what happens with isEmpty override lazy val mergeAggregates: Seq[CudfAggregate] = { - if (GpuSumDefaults.hasIsEmptyField) { - Seq(mergeSum, mergeIsEmpty, mergeIsOverflow) - } else { - Seq(mergeSum) - } + Seq(mergeSum, mergeIsEmpty, mergeIsOverflow) } override lazy val postMerge: Seq[Expression] = { - if (GpuSumDefaults.hasIsEmptyField) { - Seq( - GpuCheckOverflow(GpuIf(mergeIsOverflow.attr, - GpuLiteral.create(null, dt), - mergeSum.attr), - dt, !failOnErrorOverride), - mergeIsEmpty.attr) - } else { - postMergeAttr - } + Seq( + GpuCheckOverflow(GpuIf(mergeIsOverflow.attr, + GpuLiteral.create(null, dt), + mergeSum.attr), + dt, !failOnErrorOverride), + mergeIsEmpty.attr) } override lazy val evaluateExpression: Expression = { - if (GpuSumDefaults.hasIsEmptyField) { - GpuCheckOverflowAfterSum(sum, isEmpty, dt, !failOnErrorOverride) - } else { - GpuCheckOverflow(sum, dt, !failOnErrorOverride) - } + GpuCheckOverflowAfterSum(sum, isEmpty, dt, !failOnErrorOverride) } override def windowOutput(result: ColumnVector): ColumnVector = { @@ -1077,47 +1040,29 @@ case class GpuDecimal128Sum( override lazy val inputProjection: Seq[Expression] = { val chunks = (0 until 4).map { - GpuExtractChunk32(GpuCast(child, dt), _, GpuSumDefaults.hasIsEmptyField) - } - if (GpuSumDefaults.hasIsEmptyField) { - // Spark tracks null columns through a second column isEmpty for decimal. So null values - // are replaced with 0, and a separate boolean column for isNull is added - chunks :+ GpuIsNull(child) - } else { - chunks + GpuExtractChunk32(GpuCast(child, dt), _, replaceNullsWithZero = true) } + // Spark tracks null columns through a second column isEmpty for decimal. So null values + // are replaced with 0, and a separate boolean column for isNull is added + chunks :+ GpuIsNull(child) } private lazy val updateSumChunks = (0 until 4).map(_ => new CudfSum(LongType)) - override lazy val updateAggregates: Seq[CudfAggregate] = { - if (GpuSumDefaults.hasIsEmptyField) { - updateSumChunks :+ updateIsEmpty - } else { - updateSumChunks - } - } + override lazy val updateAggregates: Seq[CudfAggregate] = updateSumChunks :+ updateIsEmpty override lazy val postUpdate: Seq[Expression] = { val assembleExpr = GpuAssembleSumChunks(updateSumChunks.map(_.attr), dt, !failOnErrorOverride) - if (GpuSumDefaults.hasIsEmptyField) { - Seq(GpuCheckOverflow(assembleExpr, dt, !failOnErrorOverride), updateIsEmpty.attr) - } else { - Seq(assembleExpr) - } + Seq(GpuCheckOverflow(assembleExpr, dt, !failOnErrorOverride), updateIsEmpty.attr) } override lazy val preMerge: Seq[Expression] = { val chunks = (0 until 4).map { GpuExtractChunk32(sum, _, replaceNullsWithZero = false) } - if (GpuSumDefaults.hasIsEmptyField) { - // Spark tracks null columns through a second column isEmpty for decimal. So null values - // are replaced with 0, and a separate boolean column for isNull is added - chunks ++ Seq(isEmpty, GpuIsNull(sum)) - } else { - chunks - } + // Spark tracks null columns through a second column isEmpty for decimal. So null values + // are replaced with 0, and a separate boolean column for isNull is added + chunks ++ Seq(isEmpty, GpuIsNull(sum)) } private lazy val mergeSumChunks = (0 until 4).map(_ => new CudfSum(LongType)) @@ -1126,25 +1071,17 @@ case class GpuDecimal128Sum( // Cudf does not have such an aggregation, so for merge we have to work around that similar to // what happens with isEmpty override lazy val mergeAggregates: Seq[CudfAggregate] = { - if (GpuSumDefaults.hasIsEmptyField) { - mergeSumChunks ++ Seq(mergeIsEmpty, mergeIsOverflow) - } else { - mergeSumChunks - } + mergeSumChunks ++ Seq(mergeIsEmpty, mergeIsOverflow) } override lazy val postMerge: Seq[Expression] = { val assembleExpr = GpuAssembleSumChunks(mergeSumChunks.map(_.attr), dt, !failOnErrorOverride) - if (GpuSumDefaults.hasIsEmptyField) { - Seq( - GpuCheckOverflow(GpuIf(mergeIsOverflow.attr, - GpuLiteral.create(null, dt), - assembleExpr), - dt, !failOnErrorOverride), - mergeIsEmpty.attr) - } else { - Seq(assembleExpr) - } + Seq( + GpuCheckOverflow(GpuIf(mergeIsOverflow.attr, + GpuLiteral.create(null, dt), + assembleExpr), + dt, !failOnErrorOverride), + mergeIsEmpty.attr) } // Replacement Window Function