Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support float/double castings for ORC reading [databricks] #6319

Merged
merged 14 commits into from
Sep 7, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ Name | Description | Default Value
<a name="sql.format.json.enabled"></a>spark.rapids.sql.format.json.enabled|When set to true enables all json input and output acceleration. (only input is currently supported anyways)|false
<a name="sql.format.json.read.enabled"></a>spark.rapids.sql.format.json.read.enabled|When set to true enables json input acceleration|false
<a name="sql.format.orc.enabled"></a>spark.rapids.sql.format.orc.enabled|When set to false disables all orc input and output acceleration|true
<a name="sql.format.orc.floatTypesToString.enable"></a>spark.rapids.sql.format.orc.floatTypesToString.enable|The float/double numbers in GPU have different precision with CPU. So when casting them to string, the result of GPU is different from result of CPU spark.|true
<a name="sql.format.orc.multiThreadedRead.maxNumFilesParallel"></a>spark.rapids.sql.format.orc.multiThreadedRead.maxNumFilesParallel|A limit on the maximum number of files per task processed in parallel on the CPU side before the file is sent to the GPU. This affects the amount of host memory used when reading the files in parallel. Used with MULTITHREADED reader, see spark.rapids.sql.format.orc.reader.type.|2147483647
<a name="sql.format.orc.multiThreadedRead.numThreads"></a>spark.rapids.sql.format.orc.multiThreadedRead.numThreads|The maximum number of threads, on the executor, to use for reading small ORC files in parallel. This can not be changed at runtime after the executor has started. Used with MULTITHREADED reader, see spark.rapids.sql.format.orc.reader.type. DEPRECATED: use spark.rapids.sql.multiThreadedRead.numThreads|None
<a name="sql.format.orc.read.enabled"></a>spark.rapids.sql.format.orc.read.enabled|When set to false disables orc input acceleration|true
Expand Down
70 changes: 70 additions & 0 deletions integration_tests/src/main/python/orc_cast_float_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Copyright (c) 2020-2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest

from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_error
from data_gen import *
from pyspark.sql.types import *
from spark_session import with_cpu_session


@pytest.mark.parametrize('to_type', ['float', 'double', 'boolean', 'tinyint', 'smallint', 'int', 'bigint'])
def test_casting_from_float_and_double(spark_tmp_path, to_type):
orc_path = spark_tmp_path + '/orc_casting_from_float_and_double'
data_gen = [('float_column', float_gen), ('double_column', double_gen)]
with_cpu_session(
lambda spark: gen_df(spark, data_gen).write.mode('overwrite').orc(orc_path)
)
schema_str = "float_column {}, double_column {}".format(to_type, to_type)
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.read.schema(schema_str).orc(orc_path)
)


@pytest.mark.parametrize('data_gen', [DoubleGen(max_exp=32, special_cases=None),
DoubleGen(max_exp=32, special_cases=[8.88e32, 9.99e33, 3.14159e34, 2.712e35, 2e36])])
def test_casting_from_double_to_timestamp(spark_tmp_path, data_gen):
# ORC will assume the original double value in seconds, we need to convert them to
# timestamp(INT64 in micro-seconds).
#
# Since datetime library in python requires year >= 0, and UTC timestamp is start from 1970/1/1 00:00:00,
# that is, the minimum valid negative number is -1970 * 365 * 24 * 3600 = -62125920000 -> 6e10 -> 2e32.
# So we set max_exp = 32 in DoubleGen.
#
# The maximum valid positive number is INT64_MAX / 1e6 -> 1e12 -> 2e36, so we add some special cases
# from 2e33 to 2e36.
#
# In DoubleGen, special_case=None will generate some NaN, INF corner cases.

orc_path = spark_tmp_path + '/orc_casting_from_double_to_timestamp'
with_cpu_session(
lambda spark: unary_op_df(spark, data_gen).write.mode('overwrite').orc(orc_path)
)
# the name of unique column is 'a', cast it into timestamp type
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.read.schema("a timestamp").orc(orc_path)
)


def test_casting_from_overflow_double_to_timestamp(spark_tmp_path):
orc_path = spark_tmp_path + '/orc_casting_from_overflow_double_to_timestamp'
with_cpu_session(
lambda spark: unary_op_df(spark, DoubleGen(min_exp=37)).write.mode('overwrite').orc(orc_path)
)
assert_gpu_and_cpu_error(
df_fun=lambda spark: spark.read.schema("a timestamp").orc(orc_path).collect(),
conf={},
error_message="ArithmeticException"
)
Original file line number Diff line number Diff line change
Expand Up @@ -973,7 +973,7 @@ object GpuCast extends Arm {
}
}

private def castFloatingTypeToString(input: ColumnView): ColumnVector = {
private[rapids] def castFloatingTypeToString(input: ColumnView): ColumnVector = {
withResource(input.castTo(DType.STRING)) { cudfCast =>

// replace "e+" with "E"
Expand Down
152 changes: 152 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ case class GpuOrcScan(
}

object GpuOrcScan extends Arm {
var rapidsConf: RapidsConf = null

def tagSupport(scanMeta: ScanMeta[OrcScan]): Unit = {
val scan = scanMeta.wrapped
val schema = StructType(scan.readDataSchema ++ scan.readPartitionSchema)
Expand Down Expand Up @@ -151,6 +153,10 @@ object GpuOrcScan extends Arm {
.getOption("spark.sql.orc.mergeSchema").exists(_.toBoolean)) {
meta.willNotWorkOnGpu("mergeSchema and schema evolution is not supported yet")
}

// We need the value of 'isOrcFloatTypesToStringEnable' in 'canCast' method.
// So we get a reference of 'meta.conf'.
rapidsConf = meta.conf
firestarman marked this conversation as resolved.
Show resolved Hide resolved
}

private lazy val numericLevels = Seq(
Expand Down Expand Up @@ -186,6 +192,78 @@ object GpuOrcScan extends Arm {
}
}

/**
* Get the overflow flags in booleans.
* true means no overflow, while false means getting overflow.
*
* @param doubleMillis the input double column
* @param millis the long column casted from the doubleMillis
*/
private def getOverflowFlags(doubleMillis: ColumnView, millis: ColumnView): ColumnView = {
// No overflow when
// doubleMillis <= Long.MAX_VALUE &&
// doubleMillis >= Long.MIN_VALUE &&
// ((millis >= 0) == (doubleMillis >= 0))
val rangeCheck = withResource(Scalar.fromLong(Long.MaxValue)) { max =>
withResource(doubleMillis.lessOrEqualTo(max)) { upperCheck =>
withResource(Scalar.fromLong(Long.MinValue)) { min =>
withResource(doubleMillis.greaterOrEqualTo(min)) { lowerCheck =>
upperCheck.and(lowerCheck)
}
}
}
}
withResource(rangeCheck) { _ =>
val signCheck = withResource(Scalar.fromInt(0)) { zero =>
withResource(millis.greaterOrEqualTo(zero)) { longSign =>
withResource(doubleMillis.greaterOrEqualTo(zero)) { doubleSign =>
longSign.equalTo(doubleSign)
}
}
}
withResource(signCheck) { _ =>
rangeCheck.and(signCheck)
}
}
}

/**
* Borrowed from ORC "ConvertTreeReaderFactory"
* Scala does not support such numeric literal, so parse from string.
*/
private val MIN_LONG_AS_DOUBLE = java.lang.Double.valueOf("-0x1p63")

/**
* We cannot store Long.MAX_VALUE as a double without losing precision. Instead, we store
* Long.MAX_VALUE + 1 == -Long.MIN_VALUE, and then offset all comparisons by 1.
*/
private val MAX_LONG_AS_DOUBLE_PLUS_ONE = java.lang.Double.valueOf("0x1p63")

/**
* Return a boolean column indicates whether the rows in col can fix in a long.
* It assumes the input type is float or double.
*/
private def doubleCanFitInLong(col: ColumnView): ColumnVector = {
// It is true when
// (MIN_LONG_AS_DOUBLE - doubleValue < 1.0) &&
// (doubleValue < MAX_LONG_AS_DOUBLE_PLUS_ONE)
val lowRet = withResource(Scalar.fromDouble(MIN_LONG_AS_DOUBLE)) { sMin =>
withResource(Scalar.fromDouble(1.0)) { sOne =>
withResource(sMin.sub(col)) { diff =>
diff.lessThan(sOne)
}
}
}
withResource(lowRet) { _ =>
withResource(Scalar.fromDouble(MAX_LONG_AS_DOUBLE_PLUS_ONE)) { sMax =>
withResource(col.lessThan(sMax)) { highRet =>
lowRet.and(highRet)
}
}
}
}


/**
* Cast the column to the target type for ORC schema evolution.
* It is designed to support all the cases that `canCast` returns true.
Expand All @@ -211,6 +289,71 @@ object GpuOrcScan extends Arm {
} else {
downCastAnyInteger(col, toDt)
}

// float to bool/integral
case (DType.FLOAT32 | DType.FLOAT64, DType.BOOL8 | DType.INT8 | DType.INT16 | DType.INT32
| DType.INT64) =>
// Follow the CPU ORC conversion:
// First replace rows that cannot fit in long with nulls,
// next convert to long,
// then down cast long to the target integral type.
val longDoubles = withResource(doubleCanFitInLong(col)) { fitLongs =>
col.copyWithBooleanColumnAsValidity(fitLongs)
}
withResource(longDoubles) { _ =>
withResource(longDoubles.castTo(DType.INT64)) { longs =>
toDt match {
case DType.BOOL8 => longs.castTo(toDt)
case DType.INT64 => longs.incRefCount()
case _ => downCastAnyInteger(longs, toDt)
}
}
}

// float/double to double/float
case (DType.FLOAT32 | DType.FLOAT64, DType.FLOAT32 | DType.FLOAT64) =>
col.castTo(toDt)

// float/double to string
// cuDF keep 9 decimal numbers after the decimal point, and CPU keeps more than 10.
// So when casting float/double to string, the result of GPU is different from CPU.
// We let a conf 'spark.rapids.sql.format.orc.floatTypesToString.enable' to control it's
// enable or not.
case (DType.FLOAT32 | DType.FLOAT64, DType.STRING) =>
GpuCast.castFloatingTypeToString(col)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please file a follow on issue for us to go back an see what we can do to fix this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please file a follow on issue for us to go back an see what we can do to fix this?

Ok, after merging this, I will file an issue to describe this problem.


// float/double -> timestamp
case (DType.FLOAT32 | DType.FLOAT64, DType.TIMESTAMP_MICROSECONDS) =>
// Follow the CPU ORC conversion.
// val doubleMillis = doubleValue * 1000,
// val millis = Math.round(doubleMillis)
// if (noOverflow) millis else null
val milliSeconds = withResource(Scalar.fromDouble(1000.0)) { thousand =>
// ORC assumes value is in seconds
withResource(col.mul(thousand, DType.FLOAT64)) { doubleMillis =>
withResource(doubleMillis.round()) { millis =>
withResource(getOverflowFlags(doubleMillis, millis)) { overflowFlags =>
millis.copyWithBooleanColumnAsValidity(overflowFlags)
}
}
}
}
// Cast milli-seconds to micro-seconds
firestarman marked this conversation as resolved.
Show resolved Hide resolved
// We need to pay attention that when convert (milliSeconds * 1000) to INT64, there may be
// INT64-overflow.
withResource(milliSeconds) { _ =>
// Test whether if there is long-overflow
// If milliSeconds.max() > LONG_MAX, then milliSeconds.max().getDouble.toLong will return
// LONG_MAX. If milliSeconds.max() * 1000 > LONG_MAX, then 'Math.multiplyExact' will
// throw an exception (as CPU code does).
Math.multiplyExact(milliSeconds.max().getDouble.toLong, 1000.toLong)
firestarman marked this conversation as resolved.
Show resolved Hide resolved
withResource(milliSeconds.mul(Scalar.fromDouble(1000.0))) { microSeconds =>
withResource(microSeconds.castTo(DType.INT64)) { longVec =>
longVec.castTo(DType.TIMESTAMP_MICROSECONDS)
}
}
}

// TODO more types, tracked in https://github.com/NVIDIA/spark-rapids/issues/5895
case (f, t) =>
throw new QueryExecutionException(s"Unsupported type casting: $f -> $t")
Expand Down Expand Up @@ -239,6 +382,15 @@ object GpuOrcScan extends Arm {
}
case VARCHAR =>
to.getCategory == STRING

case FLOAT | DOUBLE =>
to.getCategory match {
case BOOLEAN | BYTE | SHORT | INT | LONG | FLOAT | DOUBLE | TIMESTAMP => true
case STRING => {
rapidsConf != null && rapidsConf.isOrcFloatTypesToStringEnable
firestarman marked this conversation as resolved.
Show resolved Hide resolved
}
case _ => false
}
// TODO more types, tracked in https://github.com/NVIDIA/spark-rapids/issues/5895
case _ =>
false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -864,6 +864,13 @@ object RapidsConf {
.booleanConf
.createWithDefault(true)

val ENABLE_ORC_FLOAT_TYPES_TO_STRING =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add some docs to this that indicate what will happen if we run into this situation. For most configs when we are in this kind of a situation we fall back to the CPU, but here we will throw an exception and the job will fail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have updated this in config.md.

conf("spark.rapids.sql.format.orc.floatTypesToString.enable")
.doc("The float/double numbers in GPU have different precision with CPU. So when casting " +
firestarman marked this conversation as resolved.
Show resolved Hide resolved
"them to string, the result of GPU is different from result of CPU spark.")
.booleanConf
.createWithDefault(true)

val ORC_READER_TYPE = conf("spark.rapids.sql.format.orc.reader.type")
.doc("Sets the ORC reader type. We support different types that are optimized for " +
"different environments. The original Spark style reader can be selected by setting this " +
Expand Down Expand Up @@ -1854,6 +1861,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val isOrcWriteEnabled: Boolean = get(ENABLE_ORC_WRITE)

lazy val isOrcFloatTypesToStringEnable: Boolean = get(ENABLE_ORC_FLOAT_TYPES_TO_STRING)

lazy val isOrcPerFileReadEnabled: Boolean =
RapidsReaderType.withName(get(ORC_READER_TYPE)) == RapidsReaderType.PERFILE

Expand Down