Skip to content

Commit

Permalink
Test Parquet double column stat without NaN [databricks] (#9256)
Browse files Browse the repository at this point in the history
* Test Parquet double column stat without NaN

Signed-off-by: Chong Gao <res_life@163.com>
Signed-off-by: Chong Gao <res_life@163.com>

---------

Signed-off-by: Chong Gao <res_life@163.com>
Co-authored-by: Chong Gao <res_life@163.com>
  • Loading branch information
res-life and Chong Gao authored Sep 20, 2023
1 parent 3f8bb23 commit 6da6584
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ class ParquetScaleTestSuite extends SparkQueryCompareTestSuite with Logging {
"date",
"timestamp")

test("Statistics tests for Parquet files written by GPU, float/double") {
test("Statistics tests for Parquet files written by GPU, float/double, with NaN") {
assume(false, "Blocked by https://github.com/rapidsai/cudf/issues/13948")
assume(false, "Move to scale test")

Expand Down Expand Up @@ -236,6 +236,114 @@ class ParquetScaleTestSuite extends SparkQueryCompareTestSuite with Logging {
// checkStats(tab)
}

test("Statistics tests for Parquet files written by GPU, float/double, without NaN") {
assume(true, "Move to scale test")
val schemaStr =
"""
struct<
c01: float,
c02: double
>
"""
val gen = DBGen()
val tab = gen.addTable("tab", schemaStr, rowsNum)
tab("c01").setValueGen(NonNaNFloatGenFunc())
tab("c02").setValueGen(NonNaNDoubleGenFunc())

def genDf(tab: TableGen): SparkSession => DataFrame = spark => tab.toDF(spark)

withTempPath { testDataFile =>
// Write test data to a file on CPU
writeScaleTestDataOnCpu(testDataFile, genDf(tab))

// write data and get stats on CPU
val cpuStats = withCpuSparkSession(getStats(testDataFile), sparkConf)
val cpuFileSize = testDataFile.listFiles(f => f.getName.endsWith(".parquet"))(0).length()

// write data and get stats on GPU
val gpuStats = withGpuSparkSession(getStats(testDataFile), sparkConf)
val gpuFileSize = testDataFile.listFiles(f => f.getName.endsWith(".parquet"))(0).length()

// compare schema
assertResult(cpuStats.schema)(gpuStats.schema)

// Check the Gpu file size is not too large.
assert(gpuFileSize < 2 * cpuFileSize)

/**
*
* CPU stat:
*
* ParquetStat(WrappedArray([c01] optional float c01, [c02] optional double c02),
* WrappedArray(RowGroupStat(1000000,
* WrappedArray(
* ColumnChunkStat(optional float c01,
* min = 0.0, max = 0.0,
* hasNonNullValue = false,
* isNumNullsSet = true,
* numNulls = 500532),
* ColumnChunkStat(optional double c02,
* min = 0.0,
* max = 0.0,
* hasNonNullValue = false,
* isNumNullsSet = true,
* numNulls = 498986)))))
*
* GPU stat:
*
* ParquetStat(WrappedArray([c01] optional float c01, [c02] optional double c02),
* WrappedArray(RowGroupStat(1000000,
* WrappedArray(
* ColumnChunkStat(optional float c01,
* min = -3.4026107E38,
* max = 3.4015179E38,
* hasNonNullValue = true,
* isNumNullsSet = true,
* numNulls = 500532),
* ColumnChunkStat(optional double c02,
* min = -1.7xxxxxE308,
* max = 1.7xxxxE308,
* hasNonNullValue = true,
* isNumNullsSet = true,
* numNulls = 498986)))))
*
* There are differences between CPU and GPU:
* CPU hasNonNullValue is false, CPU min/max is 0.0
*/
assert(cpuStats.rowGroupStats.length == gpuStats.rowGroupStats.length)
assert(cpuStats.rowGroupStats(0).columnStats(0).isNumNullsSet ==
gpuStats.rowGroupStats(0).columnStats(0).isNumNullsSet)
assert(cpuStats.rowGroupStats(0).columnStats(1).isNumNullsSet ==
gpuStats.rowGroupStats(0).columnStats(1).isNumNullsSet)
assert(cpuStats.rowGroupStats(0).columnStats(0).numNulls ==
gpuStats.rowGroupStats(0).columnStats(0).numNulls)
assert(cpuStats.rowGroupStats(0).columnStats(1).numNulls ==
gpuStats.rowGroupStats(0).columnStats(1).numNulls)

// write by GPU, read min/max by CPU
val (floatMin, floatMax, doubleMin, doubleMax) = withTempPath { gpuFile =>
withGpuSparkSession(spark => {
// Read from the testing Parquet file and then write to a Parquet file
spark.read.parquet(testDataFile.getAbsolutePath).coalesce(1)
.write.mode("overwrite").parquet(gpuFile.getAbsolutePath)
})

val rowArray = withCpuSparkSession(spark => {
// Read from the testing Parquet file and then write to a Parquet file
spark.read.parquet(gpuFile.getAbsolutePath)
.selectExpr("min(c01)", "max(c01)", "min(c02)", "max(c02)").collect()
})

(rowArray(0)(0), rowArray(0)(1), rowArray(0)(2), rowArray(0)(3))
}

assertResult(floatMin)(gpuStats.rowGroupStats(0).columnStats(0).min)
assertResult(floatMax)(gpuStats.rowGroupStats(0).columnStats(0).max)
assertResult(doubleMin)(gpuStats.rowGroupStats(0).columnStats(1).min)
assertResult(doubleMax)(gpuStats.rowGroupStats(0).columnStats(1).max)
}
}

test("Statistics tests for Parquet files written by GPU, basic types") {
assume(false, "Move to scale test")
val schema =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids

import org.apache.spark.sql.tests.datagen.{DataGen, GeneratorFunction, LocationToSeedMapping, RowLocation}

case class NonNaNFloatGenFunc(mapping: LocationToSeedMapping = null) extends GeneratorFunction {
override def apply(rowLoc: RowLocation): Any = {
val v = java.lang.Float.intBitsToFloat(DataGen.getRandomFor(mapping(rowLoc)).nextInt())
if (v.isNaN) {
1.toFloat // just use 1.0
} else {
v
}
}

override def withLocationToSeedMapping(mapping: LocationToSeedMapping): GeneratorFunction =
NonNaNFloatGenFunc(mapping)

override def withValueRange(min: Any, max: Any): GeneratorFunction =
throw new IllegalStateException("value ranges are not supported for Float yet")
}

case class NonNaNDoubleGenFunc(mapping: LocationToSeedMapping = null) extends GeneratorFunction {
override def apply(rowLoc: RowLocation): Any = {
val v = java.lang.Double.longBitsToDouble(DataGen.nextLong(rowLoc, mapping))
if (v.isNaN) {
1.toDouble // just use 1.0
} else {
v
}
}

override def withLocationToSeedMapping(mapping: LocationToSeedMapping): GeneratorFunction =
NonNaNDoubleGenFunc(mapping)

override def withValueRange(min: Any, max: Any): GeneratorFunction =
throw new IllegalStateException("value ranges are not supported for Double yet")
}

0 comments on commit 6da6584

Please sign in to comment.