From 84b554039687a0b0d0b0d1c898dc6d64b5fac874 Mon Sep 17 00:00:00 2001 From: Alfred Xu Date: Tue, 1 Dec 2020 23:04:10 +0800 Subject: [PATCH] verify shuffle of decimal type data (#1193) Signed-off-by: sperlingxx --- .../spark/rapids/GpuPartitioningSuite.scala | 18 ++++++++++++++---- .../rapids/GpuSinglePartitioningSuite.scala | 9 +++++++-- .../nvidia/spark/rapids/MetaUtilsSuite.scala | 18 +++++++++++------- .../rapids/RapidsDeviceMemoryStoreSuite.scala | 7 +++++-- .../spark/rapids/RapidsDiskStoreSuite.scala | 7 +++++-- .../rapids/RapidsHostMemoryStoreSuite.scala | 7 +++++-- .../com/nvidia/spark/rapids/TestUtils.scala | 2 ++ 7 files changed, 49 insertions(+), 19 deletions(-) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuPartitioningSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuPartitioningSuite.scala index f334e111d8d..1a768c8aaa3 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuPartitioningSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuPartitioningSuite.scala @@ -17,6 +17,7 @@ package com.nvidia.spark.rapids import java.io.File +import java.math.RoundingMode import ai.rapids.cudf.{DType, Table} import org.scalatest.FunSuite @@ -24,7 +25,7 @@ import org.scalatest.FunSuite import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.sql.rapids.{GpuShuffleEnv, RapidsDiskBlockManager} -import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType} +import org.apache.spark.sql.types.{DecimalType, DoubleType, IntegerType, StringType} import org.apache.spark.sql.vectorized.ColumnarBatch class GpuPartitioningSuite extends FunSuite with Arm { @@ -33,8 +34,11 @@ class GpuPartitioningSuite extends FunSuite with Arm { .column(5, null.asInstanceOf[java.lang.Integer], 3, 1, 1, 1, 1, 1, 1, 1) .column("five", "two", null, null, "one", "one", "one", "one", "one", "one") .column(5.0, 2.0, 3.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0) + .decimal64Column(-3, RoundingMode.UNNECESSARY , + 5.1, null, 3.3, 4.4e2, 0, -2.1e-1, 1.111, 2.345, null, 1.23e3) .build()) { table => - GpuColumnVector.from(table, Array(IntegerType, StringType, DoubleType)) + GpuColumnVector.from(table, Array(IntegerType, StringType, DoubleType, + DecimalType(DType.DECIMAL64_MAX_PRECISION, 3))) } } @@ -53,8 +57,14 @@ class GpuPartitioningSuite extends FunSuite with Arm { val expectedColumns = GpuColumnVector.extractBases(expected) val actualColumns = GpuColumnVector.extractBases(expected) expectedColumns.zip(actualColumns).foreach { case (expected, actual) => - withResource(expected.equalToNullAware(actual)) { compareVector => - withResource(compareVector.all(DType.BOOL8)) { compareResult => + // FIXME: For decimal types, NULL_EQUALS has not been supported in cuDF yet + val cpVec = if (expected.getType.isDecimalType) { + expected.equalTo(actual) + } else { + expected.equalToNullAware(actual) + } + withResource(cpVec) { compareVector => + withResource(compareVector.all()) { compareResult => assert(compareResult.getBoolean) } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuSinglePartitioningSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuSinglePartitioningSuite.scala index 2a0f62c4737..5a4b719d758 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuSinglePartitioningSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuSinglePartitioningSuite.scala @@ -16,12 +16,14 @@ package com.nvidia.spark.rapids +import java.math.RoundingMode + import ai.rapids.cudf.Table import org.scalatest.FunSuite import org.apache.spark.SparkConf import org.apache.spark.sql.rapids.GpuShuffleEnv -import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType} +import org.apache.spark.sql.types.{DecimalType, DoubleType, IntegerType, StringType} import org.apache.spark.sql.vectorized.ColumnarBatch class GpuSinglePartitioningSuite extends FunSuite with Arm { @@ -30,8 +32,11 @@ class GpuSinglePartitioningSuite extends FunSuite with Arm { .column(5, null.asInstanceOf[java.lang.Integer], 3, 1, 1, 1, 1, 1, 1, 1) .column("five", "two", null, null, "one", "one", "one", "one", "one", "one") .column(5.0, 2.0, 3.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0) + .decimal64Column(-3, RoundingMode.UNNECESSARY , + 5.1, null, 3.3, 4.4e2, 0, -2.1e-1, 1.111, 2.345, null, 1.23e3) .build()) { table => - GpuColumnVector.from(table, Array(IntegerType, StringType, DoubleType)) + GpuColumnVector.from(table, Array(IntegerType, StringType, DoubleType, + DecimalType(ai.rapids.cudf.DType.DECIMAL64_MAX_PRECISION, 3))) } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/MetaUtilsSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/MetaUtilsSuite.scala index 2917e9d4530..60107eda3ae 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/MetaUtilsSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/MetaUtilsSuite.scala @@ -16,11 +16,13 @@ package com.nvidia.spark.rapids +import java.math.RoundingMode + import ai.rapids.cudf.{BufferType, ContiguousTable, DeviceMemoryBuffer, Table} import com.nvidia.spark.rapids.format.{CodecType, ColumnMeta} import org.scalatest.FunSuite -import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, StringType, StructType} +import org.apache.spark.sql.types.{DataType, DecimalType, DoubleType, IntegerType, StringType, StructType} import org.apache.spark.sql.vectorized.ColumnarBatch class MetaUtilsSuite extends FunSuite with Arm { @@ -29,6 +31,7 @@ class MetaUtilsSuite extends FunSuite with Arm { .column(5, null.asInstanceOf[java.lang.Integer], 3, 1) .column("five", "two", null, null) .column(5.0, 2.0, 3.0, 1.0) + .decimal64Column(-5, RoundingMode.UNNECESSARY, 0, null, -1.4, 10.123) .build()) { table => table.contiguousSplit()(0) } @@ -109,12 +112,12 @@ class MetaUtilsSuite extends FunSuite with Arm { } test("buildDegenerateTableMeta no rows") { - val schema = StructType.fromDDL("a INT, b STRING, c DOUBLE") + val schema = StructType.fromDDL("a INT, b STRING, c DOUBLE, d DECIMAL(15, 5)") withResource(GpuColumnVector.emptyBatch(schema)) { batch => val meta = MetaUtils.buildDegenerateTableMeta(batch) assertResult(null)(meta.bufferMeta) assertResult(0)(meta.rowCount) - assertResult(3)(meta.columnMetasLength) + assertResult(4)(meta.columnMetasLength) (0 until meta.columnMetasLength).foreach { i => val columnMeta = meta.columnMetas(i) assertResult(0)(columnMeta.nullCount) @@ -130,7 +133,7 @@ class MetaUtilsSuite extends FunSuite with Arm { } test("buildDegenerateTableMeta no rows compressed table") { - val schema = StructType.fromDDL("a INT, b STRING, c DOUBLE") + val schema = StructType.fromDDL("a INT, b STRING, c DOUBLE, d DECIMAL(15, 5)") withResource(GpuColumnVector.emptyBatch(schema)) { uncompressedBatch => val uncompressedMeta = MetaUtils.buildDegenerateTableMeta(uncompressedBatch) withResource(DeviceMemoryBuffer.allocate(0)) { buffer => @@ -140,7 +143,7 @@ class MetaUtilsSuite extends FunSuite with Arm { val meta = MetaUtils.buildDegenerateTableMeta(batch) assertResult(null)(meta.bufferMeta) assertResult(0)(meta.rowCount) - assertResult(3)(meta.columnMetasLength) + assertResult(4)(meta.columnMetasLength) (0 until meta.columnMetasLength).foreach { i => val columnMeta = meta.columnMetas(i) assertResult(0)(columnMeta.nullCount) @@ -163,9 +166,10 @@ class MetaUtilsSuite extends FunSuite with Arm { val table = contigTable.getTable val origBuffer = contigTable.getBuffer val meta = MetaUtils.buildTableMeta(10, table, origBuffer) + val sparkTypes = Array[DataType](IntegerType, StringType, DoubleType, + DecimalType(ai.rapids.cudf.DType.DECIMAL64_MAX_PRECISION, 5)) withResource(origBuffer.sliceWithCopy(0, origBuffer.getLength)) { buffer => - withResource(MetaUtils.getBatchFromMeta(buffer, meta, - Array[DataType](IntegerType, StringType, DoubleType))) { batch => + withResource(MetaUtils.getBatchFromMeta(buffer, meta, sparkTypes)) { batch => assertResult(table.getRowCount)(batch.numRows) assertResult(table.getNumberOfColumns)(batch.numCols) (0 until table.getNumberOfColumns).foreach { i => diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStoreSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStoreSuite.scala index 08cafc31a52..9effde6b9a4 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStoreSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStoreSuite.scala @@ -17,6 +17,7 @@ package com.nvidia.spark.rapids import java.io.File +import java.math.RoundingMode import scala.collection.mutable.ArrayBuffer @@ -29,7 +30,7 @@ import org.scalatest.FunSuite import org.scalatest.mockito.MockitoSugar import org.apache.spark.sql.rapids.RapidsDiskBlockManager -import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, StringType} +import org.apache.spark.sql.types.{DataType, DecimalType, DoubleType, IntegerType, StringType} class RapidsDeviceMemoryStoreSuite extends FunSuite with Arm with MockitoSugar { private def buildContiguousTable(): ContiguousTable = { @@ -37,6 +38,7 @@ class RapidsDeviceMemoryStoreSuite extends FunSuite with Arm with MockitoSugar { .column(5, null.asInstanceOf[java.lang.Integer], 3, 1) .column("five", "two", null, null) .column(5.0, 2.0, 3.0, 1.0) + .decimal64Column(-5, RoundingMode.UNNECESSARY, 0, null, -1.4, 10.123) .build()) { table => table.contiguousSplit()(0) } @@ -106,7 +108,8 @@ class RapidsDeviceMemoryStoreSuite extends FunSuite with Arm with MockitoSugar { test("get column batch") { val catalog = new RapidsBufferCatalog - val sparkTypes = Array[DataType](IntegerType, StringType, DoubleType) + val sparkTypes = Array[DataType](IntegerType, StringType, DoubleType, + DecimalType(ai.rapids.cudf.DType.DECIMAL64_MAX_PRECISION, 5)) withResource(new RapidsDeviceMemoryStore(catalog)) { store => val bufferId = MockRapidsBufferId(7) withResource(buildContiguousTable()) { ct => diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDiskStoreSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDiskStoreSuite.scala index 641f8504b2d..20194320f9b 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDiskStoreSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDiskStoreSuite.scala @@ -17,6 +17,7 @@ package com.nvidia.spark.rapids import java.io.File +import java.math.RoundingMode import ai.rapids.cudf.{ContiguousTable, DeviceMemoryBuffer, HostMemoryBuffer, Table} import org.mockito.ArgumentMatchers @@ -25,7 +26,7 @@ import org.scalatest.{BeforeAndAfterEach, FunSuite} import org.scalatest.mockito.MockitoSugar import org.apache.spark.sql.rapids.RapidsDiskBlockManager -import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, StringType} +import org.apache.spark.sql.types.{DataType, DecimalType, DoubleType, IntegerType, StringType} class RapidsDiskStoreSuite extends FunSuite with BeforeAndAfterEach with Arm with MockitoSugar { val TEST_FILES_ROOT: File = TestUtils.getTempDir(this.getClass.getSimpleName) @@ -43,6 +44,7 @@ class RapidsDiskStoreSuite extends FunSuite with BeforeAndAfterEach with Arm wit .column(5, null.asInstanceOf[java.lang.Integer], 3, 1) .column("five", "two", null, null) .column(5.0, 2.0, 3.0, 1.0) + .decimal64Column(-5, RoundingMode.UNNECESSARY, 0, null, -1.4, 10.123) .build()) { table => table.contiguousSplit()(0) } @@ -83,7 +85,8 @@ class RapidsDiskStoreSuite extends FunSuite with BeforeAndAfterEach with Arm wit } test("get columnar batch") { - val sparkTypes = Array[DataType](IntegerType, StringType, DoubleType) + val sparkTypes = Array[DataType](IntegerType, StringType, DoubleType, + DecimalType(ai.rapids.cudf.DType.DECIMAL64_MAX_PRECISION, 5)) val bufferId = MockRapidsBufferId(1, canShareDiskPaths = false) val bufferPath = bufferId.getDiskPath(null) assert(!bufferPath.exists) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsHostMemoryStoreSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsHostMemoryStoreSuite.scala index 4dfaa72fc5e..be7f8d4e9c4 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsHostMemoryStoreSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsHostMemoryStoreSuite.scala @@ -17,6 +17,7 @@ package com.nvidia.spark.rapids import java.io.File +import java.math.RoundingMode import ai.rapids.cudf.{ContiguousTable, Cuda, HostColumnVector, HostMemoryBuffer, Table} import com.nvidia.spark.rapids.RapidsPluginImplicits._ @@ -26,7 +27,7 @@ import org.scalatest.FunSuite import org.scalatest.mockito.MockitoSugar import org.apache.spark.sql.rapids.RapidsDiskBlockManager -import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, LongType, StringType} +import org.apache.spark.sql.types.{DataType, DecimalType, DoubleType, IntegerType, LongType, StringType} class RapidsHostMemoryStoreSuite extends FunSuite with Arm with MockitoSugar { private def buildContiguousTable(): ContiguousTable = { @@ -34,6 +35,7 @@ class RapidsHostMemoryStoreSuite extends FunSuite with Arm with MockitoSugar { .column(5, null.asInstanceOf[java.lang.Integer], 3, 1) .column("five", "two", null, null) .column(5.0, 2.0, 3.0, 1.0) + .decimal64Column(-5, RoundingMode.UNNECESSARY, 0, null, -1.4, 10.123) .build()) { table => table.contiguousSplit()(0) } @@ -119,7 +121,8 @@ class RapidsHostMemoryStoreSuite extends FunSuite with Arm with MockitoSugar { } test("get memory buffer") { - val sparkTypes = Array[DataType](IntegerType, StringType, DoubleType) + val sparkTypes = Array[DataType](IntegerType, StringType, DoubleType, + DecimalType(ai.rapids.cudf.DType.DECIMAL64_MAX_PRECISION, 5)) val bufferId = MockRapidsBufferId(7) val spillPriority = -10 val hostStoreMaxSize = 1L * 1024 * 1024 diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/TestUtils.scala b/tests/src/test/scala/com/nvidia/spark/rapids/TestUtils.scala index 534989a3106..fd825079711 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/TestUtils.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/TestUtils.scala @@ -121,6 +121,8 @@ object TestUtils extends Assertions with Arm { case DType.FLOAT32 => assertResult(e.getFloat(i))(a.getFloat(i)) case DType.FLOAT64 => assertResult(e.getDouble(i))(a.getDouble(i)) case DType.STRING => assertResult(e.getJavaString(i))(a.getJavaString(i)) + case dt if dt.isDecimalType && dt.isBackedByLong => + assertResult(e.getBigDecimal(i))(a.getBigDecimal(i)) case _ => throw new UnsupportedOperationException("not implemented yet") } }