From d11e3a3d667210137965bc3f5a5217f27039e15b Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Wed, 16 Feb 2022 13:21:55 -0600 Subject: [PATCH] Avoid pre-cast by using unsigned type output for GpuExtractChunk32 Signed-off-by: Jason Lowe --- .../nvidia/spark/rapids/GpuColumnVector.java | 10 +++- .../nvidia/spark/rapids/GpuDataTypes.scala | 58 +++++++++++++++++++ .../com/nvidia/spark/rapids/literals.scala | 31 ++++++++-- .../spark/sql/rapids/AggregateFunctions.scala | 14 ++--- 4 files changed, 96 insertions(+), 17 deletions(-) create mode 100644 sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDataTypes.scala diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java index 016e2c1a865..29223d73a47 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java @@ -152,7 +152,11 @@ public static synchronized void debug(String name, HostColumnVectorCore hostCol) || DType.TIMESTAMP_SECONDS.equals(type) || DType.TIMESTAMP_MICROSECONDS.equals(type) || DType.TIMESTAMP_MILLISECONDS.equals(type) - || DType.TIMESTAMP_NANOSECONDS.equals(type)) { + || DType.TIMESTAMP_NANOSECONDS.equals(type) + || DType.UINT8.equals(type) + || DType.UINT16.equals(type) + || DType.UINT32.equals(type) + || DType.UINT64.equals(type)) { debugInteger(hostCol, type); } else if (DType.BOOL8.equals(type)) { for (int i = 0; i < hostCol.getRowCount(); i++) { @@ -486,6 +490,10 @@ private static DType toRapidsOrNull(DataType type) { // So, we don't have to handle decimal-supportable problem at here. DecimalType dt = (DecimalType) type; return DecimalUtil.createCudfDecimal(dt.precision(), dt.scale()); + } else if (type instanceof GpuUnsignedIntegerType) { + return DType.UINT32; + } else if (type instanceof GpuUnsignedLongType) { + return DType.UINT64; } return null; } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDataTypes.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDataTypes.scala new file mode 100644 index 00000000000..8e81df94aaf --- /dev/null +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDataTypes.scala @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids + +import org.apache.spark.sql.types.DataType + +/** + * An unsigned, 32-bit integer type that maps to DType.UINT32 in cudf. + * @note This type should NOT be used in Catalyst plan nodes that could be exposed to + * CPU expressions. + */ +class GpuUnsignedIntegerType private() extends DataType { + // The companion object and this class are separated so the companion object also subclasses + // this type. Otherwise the companion object would be of type "UnsignedIntegerType$" in + // byte code. + // Defined with a private constructor so the companion object is the only possible instantiation. + override def defaultSize: Int = 4 + + override def simpleString: String = "uint" + + override def asNullable: DataType = this +} + +case object GpuUnsignedIntegerType extends GpuUnsignedIntegerType + + +/** + * An unsigned, 64-bit integer type that maps to DType.UINT64 in cudf. + * @note This type should NOT be used in Catalyst plan nodes that could be exposed to + * CPU expressions. + */ +class GpuUnsignedLongType private() extends DataType { + // The companion object and this class are separated so the companion object also subclasses + // this type. Otherwise the companion object would be of type "UnsignedIntegerType$" in + // byte code. + // Defined with a private constructor so the companion object is the only possible instantiation. + override def defaultSize: Int = 8 + + override def simpleString: String = "ulong" + + override def asNullable: DataType = this +} + +case object GpuUnsignedLongType extends GpuUnsignedLongType diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/literals.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/literals.scala index 0984ec2580f..9060c65474a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/literals.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/literals.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -52,8 +52,8 @@ object GpuScalar extends Arm with Logging { case DType.FLOAT64 => v.getDouble case DType.INT8 => v.getByte case DType.INT16 => v.getShort - case DType.INT32 => v.getInt - case DType.INT64 => v.getLong + case DType.INT32 | DType.UINT32 => v.getInt + case DType.INT64 | DType.UINT64 => v.getLong case DType.TIMESTAMP_DAYS => v.getInt case DType.TIMESTAMP_MICROSECONDS => v.getLong case DType.STRING => UTF8String.fromBytes(v.getUTF8) @@ -356,6 +356,21 @@ object GpuScalar extends Arm with Logging { case _ => throw new IllegalArgumentException(s"'$v: ${v.getClass}' is not supported" + s" for MapType, expecting MapData") } + case GpuUnsignedIntegerType => v match { + case i: Int => Scalar.fromUnsignedInt(i) + case s: Short => Scalar.fromUnsignedInt(s.toInt) + case b: Byte => Scalar.fromUnsignedInt(b.toInt) + case _ => throw new IllegalArgumentException(s"'$v: ${v.getClass}' is not supported" + + s" for IntegerType, expecting Int.") + } + case GpuUnsignedLongType => v match { + case l: Long => Scalar.fromUnsignedLong(l) + case i: Int => Scalar.fromUnsignedLong(i.toLong) + case s: Short => Scalar.fromUnsignedLong(s.toLong) + case b: Byte => Scalar.fromUnsignedLong(b.toLong) + case _ => throw new IllegalArgumentException(s"'$v: ${v.getClass}' is not supported" + + s" for LongType, expecting Long, or Int.") + } case _ => throw new UnsupportedOperationException(s"${v.getClass} '$v' is not supported" + s" as a Scalar yet") } @@ -564,10 +579,14 @@ object GpuLiteral { * Create a GPU literal with default value for given DataType */ def default(dataType: DataType): GpuLiteral = { - val cpuLiteral = Literal.default(dataType) - GpuLiteral(cpuLiteral.value, cpuLiteral.dataType) + dataType match { + case GpuUnsignedIntegerType => GpuLiteral(0, dataType) + case GpuUnsignedLongType => GpuLiteral(0L, dataType) + case _ => + val cpuLiteral = Literal.default(dataType) + GpuLiteral(cpuLiteral.value, cpuLiteral.dataType) + } } - } /** diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala index 99018c3d986..3a96f9806e0 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/AggregateFunctions.scala @@ -575,7 +575,7 @@ case class GpuExtractChunk32( replaceNullsWithZero: Boolean) extends GpuExpression with ShimExpression { override def nullable: Boolean = true - override def dataType: DataType = LongType + override def dataType: DataType = if (chunkIdx < 3) GpuUnsignedIntegerType else IntegerType override def sql: String = data.sql @@ -596,12 +596,7 @@ case class GpuExtractChunk32( } else { chunkCol } - // TODO: This is a workaround for a libcudf sort-aggregation bug, see - // https://github.com/rapidsai/cudf/issues/10246. Ideally we should not need to pay the time - // and memory to upcast here since we should be able to get the upcast from libcudf for free. - withResource(replacedCol) { replacedCol => - GpuColumnVector.from(replacedCol.castTo(DType.INT64), dataType) - } + GpuColumnVector.from(replacedCol, dataType) } } @@ -1488,9 +1483,8 @@ case class GpuDecimal128Average(child: Expression, dt: DecimalType) // decimal aggregations. The null gets inserted back in with evaluateExpression where // a divide by 0 gets replaced with a null. val chunks = (0 until 4).map { chunkIdx => - GpuCoalesce(Seq( - GpuExtractChunk32(GpuCast(child, dt), chunkIdx, replaceNullsWithZero = false), - GpuLiteral.default(LongType))) + val extract = GpuExtractChunk32(GpuCast(child, dt), chunkIdx, replaceNullsWithZero = false) + GpuCoalesce(Seq(extract, GpuLiteral.default(extract.dataType))) } val forCount = GpuCast(GpuIsNotNull(child), LongType) chunks :+ forCount