From df2464b5388fedce371e04241cb57b1e7ca2dea7 Mon Sep 17 00:00:00 2001 From: sperlingxx Date: Tue, 24 Nov 2020 16:12:25 +0800 Subject: [PATCH 1/3] support GpuFilter and GpuCoalesceBatches for decimal data --- .../nvidia/spark/rapids/GpuOverrides.scala | 3 +- .../rapids/GpuCoalesceBatchesSuite.scala | 66 +++++++++++-------- .../com/nvidia/spark/rapids/TestUtils.scala | 2 + 3 files changed, 44 insertions(+), 27 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index a812300c37f..7516fb8f497 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -2106,7 +2106,8 @@ object GpuOverrides { allowMaps = true, allowArray = true, allowStruct = true, - allowNesting = true) + allowNesting = true, + allowDecimal = true) override def convertToGpu(): GpuExec = GpuFilterExec(childExprs(0).convertToGpu(), childPlans(0).convertIfNeeded()) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala index af0d0482992..10539cbe14a 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala @@ -24,14 +24,14 @@ import com.nvidia.spark.rapids.format.CodecType import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.rapids.metrics.source.MockTaskContext -import org.apache.spark.sql.types.{DataType, DataTypes, LongType, StructField, StructType} +import org.apache.spark.sql.types.{DataType, DataTypes, DecimalType, LongType, StructField, StructType} import org.apache.spark.sql.vectorized.ColumnarBatch class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { test("test with small input batches") { withGpuSparkSession(spark => { - val testData = doubleCsvDf(spark).coalesce(1) + val testData = mixedDf(spark, numSlices = 1) val gpuRowToColumnarExec = GpuRowToColumnarExec(testData.queryExecution.sparkPlan, TargetSize(1)) val gpuCoalesceBatches = GpuCoalesceBatches(gpuRowToColumnarExec, TargetSize(100000)) @@ -43,7 +43,7 @@ class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { // assert final results are correct assert(batches.hasNext) val batch = batches.next() - assert(batch.numCols() == 2) + assert(batch.numCols() == 5) assert(batch.numRows() == 14) assert(!batches.hasNext) batch.close() @@ -229,13 +229,13 @@ class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { withGpuSparkSession(spark => { - val df = longsCsvDf(spark) + val df = mixedDf(spark, numSlices = 14) // A coalesce step is added after the filter to help with the case where much of the // data is filtered out. The select is there to prevent the coalesce from being // the last thing in the plan which will cause the coalesce to be optimized out. val df2 = df - .filter(df.col("six").gt(5)).select(df.col("six") * 2) + .filter(df.col("ints").gt(90)).select(df.col("decimals")) val coalesce = df2.queryExecution.executedPlan .find(_.isInstanceOf[GpuCoalesceBatches]).get @@ -252,8 +252,8 @@ class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { df2.collect() // assert the metrics are correct - assert(coalesce.additionalMetrics("numInputBatches").value == 7) - assert(coalesce.longMetric(GpuMetricNames.NUM_OUTPUT_BATCHES).value == 7) + assert(coalesce.additionalMetrics("numInputBatches").value == 14) + assert(coalesce.longMetric(GpuMetricNames.NUM_OUTPUT_BATCHES).value == 11) }, conf) } @@ -280,6 +280,7 @@ class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { } val schema = new StructType().add("i", LongType) + .add("j", DecimalType(ai.rapids.cudf.DType.DECIMAL64_MAX_PRECISION, 3)) val dummyMetric = new SQLMetric("ignored") val coalesceIter = new GpuCoalesceIterator( batchIter, @@ -299,12 +300,16 @@ class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { var expected = 0 while (coalesceIter.hasNext) { withResource(coalesceIter.next()) { batch => - assertResult(1)(batch.numCols) - val col = GpuColumnVector.extractBases(batch).head - withResource(col.copyToHost) { hcv => - (0 until hcv.getRowCount.toInt).foreach { i => - assertResult(expected)(hcv.getLong(i)) - expected += 1 + assertResult(2)(batch.numCols) + val Array(longCol, decCol) = GpuColumnVector.extractBases(batch) + withResource(longCol.copyToHost) { longHcv => + withResource(decCol.copyToHost) { decHcv => + (0 until longHcv.getRowCount.toInt).foreach { i => + assertResult(expected)(longHcv.getLong(i)) + assertResult(expected)(decHcv.getLong(i)) + assertResult(BigDecimal(expected, 3).bigDecimal)(decHcv.getBigDecimal(i)) + expected += 1 + } } } } @@ -356,6 +361,7 @@ class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { } val schema = new StructType().add("i", LongType) + .add("j", DecimalType(ai.rapids.cudf.DType.DECIMAL64_MAX_PRECISION, 3)) val dummyMetric = new SQLMetric("ignored") val coalesceIter = new GpuCoalesceIterator( batchIter, @@ -375,12 +381,16 @@ class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { var expected = 0 while (coalesceIter.hasNext) { withResource(coalesceIter.next()) { batch => - assertResult(1)(batch.numCols) - val col = GpuColumnVector.extractBases(batch).head - withResource(col.copyToHost) { hcv => - (0 until hcv.getRowCount.toInt).foreach { i => - assertResult(expected)(hcv.getLong(i)) - expected += 1 + assertResult(2)(batch.numCols) + val Array(longCol, decCol) = GpuColumnVector.extractBases(batch) + withResource(longCol.copyToHost) { longHcv => + withResource(decCol.copyToHost) { decHcv => + (0 until longHcv.getRowCount.toInt).foreach { i => + assertResult(expected)(longHcv.getLong(i)) + assertResult(expected)(decHcv.getLong(i)) + assertResult(BigDecimal(expected, 3).bigDecimal)(decHcv.getBigDecimal(i)) + expected += 1 + } } } } @@ -390,10 +400,14 @@ class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { private def buildContiguousTable(start: Int, numRows: Int): ContiguousTable = { val vals = (0 until numRows).map(_.toLong + start) - withResource(HostColumnVector.fromLongs(vals:_*)) { hcv => + withResource(HostColumnVector.fromLongs(vals: _*)) { hcv => withResource(hcv.copyToDevice()) { cv => - withResource(new Table(cv)) { table => - table.contiguousSplit()(0) + withResource(HostColumnVector.decimalFromLongs(-3, vals: _*)) { decHcv => + withResource(decHcv.copyToDevice()) { decCv => + withResource(new Table(cv, decCv)) { table => + table.contiguousSplit()(0) + } + } } } } @@ -401,7 +415,8 @@ class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { private def buildUncompressedBatch(start: Int, numRows: Int): ColumnarBatch = { withResource(buildContiguousTable(start, numRows)) { ct => - GpuColumnVector.from(ct.getTable, Array[DataType](LongType)) + GpuColumnVector.from(ct.getTable, + Array[DataType](LongType, DecimalType(ai.rapids.cudf.DType.DECIMAL64_MAX_PRECISION, 3))) } } @@ -409,9 +424,8 @@ class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { val codec = TableCompressionCodec.getCodec(CodecType.NVCOMP_LZ4) withResource(codec.createBatchCompressor(0, Cuda.DEFAULT_STREAM)) { compressor => compressor.addTableToCompress(buildContiguousTable(start, numRows)) - withResource(compressor.finish()) { compressedResults => - GpuCompressedColumnVector.from(compressedResults.head, Array[DataType](LongType)) - } + GpuCompressedColumnVector.from(compressor.finish().head, + Array[DataType](LongType, DecimalType(ai.rapids.cudf.DType.DECIMAL64_MAX_PRECISION, 3))) } } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/TestUtils.scala b/tests/src/test/scala/com/nvidia/spark/rapids/TestUtils.scala index 534989a3106..fd825079711 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/TestUtils.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/TestUtils.scala @@ -121,6 +121,8 @@ object TestUtils extends Assertions with Arm { case DType.FLOAT32 => assertResult(e.getFloat(i))(a.getFloat(i)) case DType.FLOAT64 => assertResult(e.getDouble(i))(a.getDouble(i)) case DType.STRING => assertResult(e.getJavaString(i))(a.getJavaString(i)) + case dt if dt.isDecimalType && dt.isBackedByLong => + assertResult(e.getBigDecimal(i))(a.getBigDecimal(i)) case _ => throw new UnsupportedOperationException("not implemented yet") } } From b04e5b10c51817fff3a6d0aa62f70e1d018a2e71 Mon Sep 17 00:00:00 2001 From: sperlingxx Date: Tue, 24 Nov 2020 19:33:25 +0800 Subject: [PATCH 2/3] add FilterExprSuite Signed-off-by: sperlingxx --- .../nvidia/spark/rapids/FilterExprSuite.scala | 36 +++++++++++++++++++ .../rapids/SparkQueryCompareTestSuite.scala | 4 +-- 2 files changed, 38 insertions(+), 2 deletions(-) create mode 100644 tests/src/test/scala/com/nvidia/spark/rapids/FilterExprSuite.scala diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/FilterExprSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/FilterExprSuite.scala new file mode 100644 index 00000000000..469c85be700 --- /dev/null +++ b/tests/src/test/scala/com/nvidia/spark/rapids/FilterExprSuite.scala @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2019-2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids + +import org.apache.spark.sql.functions._ + +class FilterExprSuite extends SparkQueryCompareTestSuite { + testSparkResultsAreEqual("filter with decimal literals", mixedDf(_), repart = 0) { df => + df.select(col("doubles"), col("decimals"), + lit(BigDecimal(0L)).as("BigDec0"), + lit(BigDecimal(123456789L, 6)).as("BigDec1"), + lit(BigDecimal(-2.12314e-8)).as("BigDec2")) + .filter(col("doubles").gt(3.0)) + .select("BigDec0", "BigDec1", "doubles", "decimals") + } + + testSparkResultsAreEqual("filter with decimal columns", mixedDf(_), repart = 0) { df => + df.filter(col("ints") > 90) + .filter(col("decimals").isNotNull) + .select("ints", "strings", "decimals") + } +} diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala index a88fefe3554..02b5b6ae9a2 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala @@ -944,11 +944,11 @@ trait SparkQueryCompareTestSuite extends FunSuite with Arm { Row(99, 400L, 4.0, "D", Decimal("1.5")), Row(98, 500L, 5.0, "E", Decimal("1.6")), Row(97, -100L, 6.0, "F", Decimal("1.7")), - Row(96, -500L, 0.0, "G", Decimal("1.8")), + Row(96, -500L, 0.0, "G", null), Row(95, -700L, 8.0, "E\u0480\u0481", Decimal("1.9")), Row(Int.MaxValue, Long.MinValue, Double.PositiveInfinity, "\u0000", Decimal("2.0")), Row(Int.MinValue, Long.MaxValue, Double.NaN, "\u0000", Decimal("100.123")), - Row(null, null, null, "actions are judged by intentions", Decimal("200.246")), + Row(null, null, null, "actions are judged by intentions", null), Row(94, -900L, 9.0, "g\nH", Decimal("300.369")), Row(92, -1200L, 12.0, "IJ\"\u0100\u0101\u0500\u0501", Decimal("-1.47e3")), Row(90, 1500L, 15.0, "\ud720\ud721", Decimal("-22.2345"))) From 4198f8ff8f6f7eeb6613f7efb12dcaff2f585412 Mon Sep 17 00:00:00 2001 From: Alfred Xu Date: Tue, 1 Dec 2020 23:26:28 +0800 Subject: [PATCH 3/3] Update tests/src/test/scala/com/nvidia/spark/rapids/FilterExprSuite.scala Co-authored-by: Jason Lowe --- .../test/scala/com/nvidia/spark/rapids/FilterExprSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/FilterExprSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/FilterExprSuite.scala index 469c85be700..991eb4fd2b7 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/FilterExprSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/FilterExprSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2020, NVIDIA CORPORATION. + * Copyright (c) 2020, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License.